You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2011/09/21 17:02:03 UTC

svn commit: r1173698 [1/2] - /river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/

Author: peter_firmstone
Date: Wed Sep 21 15:02:02 2011
New Revision: 1173698

URL: http://svn.apache.org/viewvc?rev=1173698&view=rev
Log:
Reference Collection Utilities

Added:
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionsConcurrent.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/RC.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/Ref.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceFactory.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/SoftIdentityReferenceKey.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/SoftReferenceKey.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/StrongReferenceKey.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/WeakIdentityReferenceKey.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/WeakReferenceKey.java   (with props)
Removed:
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ConcurrentCollections.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ConcurrentSoftMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ConcurrentWeakIdentityMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ConcurrentWeakMap.java

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionsConcurrent.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionsConcurrent.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionsConcurrent.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionsConcurrent.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+public class CollectionsConcurrent {
+    // Not instantiable
+    private CollectionsConcurrent() {}
+
+    public static <T> Set<T> multiReadSet(Set<T> s) {
+	return new MultiReadSet<T>(s);
+    }
+    
+    public static <T> Collection<T> multiReadCollection(Collection<T> c){
+	return new MultiReadCollection<T>(c);
+    }
+    
+    static class MultiReadCollection<E> implements Collection<E>, Serializable {
+	private static final long serialVersionUID = 1L;
+
+	final Collection<E> c;  // Backing Collection
+	final Object mutex;     // Object on which to synchronize
+	final ReadWriteLock rwlock;
+	final Lock rl;
+	final Lock wl;
+
+	MultiReadCollection(Collection<E> c) {
+            if (c==null) {
+		throw new NullPointerException();
+	    }
+	    this.c = c;
+            mutex = this;
+	    rwlock = new ReentrantReadWriteLock();
+	    rl = rwlock.readLock();
+	    wl = rwlock.writeLock();
+        }
+
+	Lock getReadLock(){
+	    return rl;
+	}
+	
+	Lock getWriteLock(){
+	    return rl;
+	}
+	
+	public int size() {
+	    rl.lock();
+	    try {
+		return c.size();
+	    }
+	    finally {
+		rl.unlock();
+	    }
+        }
+	public boolean isEmpty() {
+	    rl.lock();
+	    try {
+		return c.isEmpty();
+	    }
+	    finally {
+		rl.unlock();
+	    }
+        }
+	public boolean contains(Object o) {
+	    rl.lock();
+	    try {
+		return c.contains(o);
+	    }
+	    finally {
+		rl.unlock();
+	    }
+        }
+	public Object[] toArray() {
+	    rl.lock();
+	    try {
+		return c.toArray();
+	    }
+	    finally {
+		rl.unlock();
+	    }
+        }
+	public <T> T[] toArray(T[] a) {
+	    rl.lock();
+	    try {
+		return c.toArray(a);
+	    }
+	    finally {
+		rl.unlock();
+	    }
+        }
+
+	public Iterator<E> iterator() {
+	    rl.lock();
+	    try {
+		return new CollectionIterator<E>(this);
+	    }
+	    finally {
+		rl.unlock();
+	    }
+        }
+
+	public boolean add(E e) {
+	    wl.lock();
+	    try{
+		return c.add(e);
+	    }finally{
+		wl.unlock();
+	    }
+        }
+	public boolean remove(Object o) {
+	    wl.lock();
+	    try{
+		return c.remove(o);
+	    }finally{
+		wl.unlock();
+	    }
+        }
+
+	public boolean containsAll(Collection<?> coll) {
+	    rl.lock();
+	    try{
+		return c.containsAll(coll);
+	    }finally{
+		rl.unlock();
+	    }
+        }
+	public boolean addAll(Collection<? extends E> coll) {
+	    wl.lock();
+	    try{
+		return c.addAll(coll);
+	    }finally{
+		wl.unlock();
+	    }
+        }
+	public boolean removeAll(Collection<?> coll) {
+	    wl.lock();
+	    try{
+		return c.removeAll(coll);
+	    }finally{
+		wl.unlock();
+	    }
+        }
+	public boolean retainAll(Collection<?> coll) {
+	    wl.lock();
+	    try{
+		return c.retainAll(coll);
+	    }finally{
+		wl.unlock();
+	    }
+        }
+	public void clear() {
+	    wl.lock();
+	    try{
+		c.clear();
+	    }finally{
+		wl.unlock();
+	    }
+        }
+	@Override
+	public String toString() {
+	    wl.lock();
+	    try{
+		return c.toString();
+	    }finally{
+		wl.unlock();
+	    }
+        }
+        private void writeObject(ObjectOutputStream s) throws IOException {
+	    rl.lock();
+	    try{
+		s.defaultWriteObject();
+	    }finally{
+		rl.unlock();
+	    }
+        }
+    }
+    
+    static class CollectionIterator<E> implements Iterator<E> {
+	final MultiReadCollection col;
+	final Iterator<E> iter;
+	volatile E element;
+	CollectionIterator(MultiReadCollection<E> c){
+	    col = c;
+	    Collection<E> copy = new ArrayList<E>(c.size());
+	    copy.addAll(c);
+	    iter = copy.iterator();   
+	}
+
+	public boolean hasNext() {
+		return iter.hasNext();
+	}
+
+	public E next() {
+		element = iter.next();
+		return element;
+	}
+
+	public void remove() {
+	    col.remove(element);
+	}
+	
+    }
+    
+       static class MultiReadSet<E>
+	  extends MultiReadCollection<E>
+	  implements Set<E> {
+	private static final long serialVersionUID = 1L;
+
+	MultiReadSet(Set<E> s) {
+            super(s);
+        }
+
+	@Override
+	public boolean equals(Object o) {
+	    rl.lock();
+	    try{
+		return c.equals(o);
+	    }finally{
+		rl.unlock();
+	    }
+        }
+	@Override
+	public int hashCode() {
+	    rl.lock();
+	    try{
+		return c.hashCode();
+	    }finally{
+		rl.unlock();
+	    }
+        }
+    }
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionsConcurrent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+/**
+ * An interface for converting Map.Entry to disguise contained References.
+ * 
+ * @author Peter Firmstone
+ */
+interface EntryConversionFactory<O, R> {
+
+    O asFacade(R u);
+
+    R asReference(O w);
+
+    void processQueue();
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Map.Entry;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+class EntryFacadeConverter<K,V> implements EntryConversionFactory<Entry<K,V>, Entry<Reference<K>, Reference<V>>> {
+    private final Ref valRef;
+    private final ReferenceQueueProcessor rqp;
+    private final ReferenceQueue<V> queue;
+
+    EntryFacadeConverter(ReferenceQueue<V> queue, Ref valRef, 
+            ReferenceQueueProcessor rqp) {
+        this.queue = queue;
+        this.valRef = valRef;
+        this.rqp = rqp;
+    }
+
+    public Entry<K,V> asFacade(Entry<Reference<K>, Reference<V>> u) {
+        return new ReferenceEntryFacade<K, V>(u, valRef, queue);
+    }
+
+    public Entry<Reference<K>, Reference<V>> asReference(Entry<K,V> w) {
+        
+        // This is where we check the cast.
+        if (w instanceof ReferenceEntryFacade) {
+            return ((ReferenceEntryFacade<K,V>)w).reveal();
+        }
+        return null;
+    }
+
+    public void processQueue() {
+        rqp.processQueue();
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.util.Iterator;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+class EntryIteratorFacade<O, R> implements Iterator<O> {
+    private Iterator<R> iterator;
+    private EntryConversionFactory<O, R> wf;
+
+    EntryIteratorFacade(Iterator<R> iterator, EntryConversionFactory<O, R> wf) {
+        this.iterator = iterator;
+        this.wf = wf;
+    }
+
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    public O next() {
+        return wf.asFacade(iterator.next());
+    }
+
+    public void remove() {
+        iterator.remove();
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * This doesn't support creation of, nor addition of Entry's
+ * however it does support removal and retention.
+ *
+ * It simply disguises the contained set's reference from the
+ * caller with a Facade, or exposes the underlying references to the set, when
+ * returned by the caller.
+ *
+ * @param <O>
+ * @param <R>
+ */
+/**
+ *
+ * @author peter
+ */
+class EntrySetFacade<O, R> implements Set<O> {
+    private Set<R> set;
+    private EntryConversionFactory<O, R> factory;
+
+    EntrySetFacade(Set<R> set, EntryConversionFactory<O, R> wf) {
+        this.set = set;
+        this.factory = wf;
+    }
+
+    public int size() {
+        factory.processQueue();
+        return set.size();
+    }
+
+    public boolean isEmpty() {
+        factory.processQueue();
+        return set.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+        factory.processQueue();
+        // cast is checked by factory and returns null if other.
+        R entryRef = factory.asReference((O) o);
+        // entryRef is null if wrong type.
+        if (entryRef == null) {
+            return false;
+        }
+        return set.contains(entryRef);
+    }
+
+    public Iterator<O> iterator() {
+        factory.processQueue();
+        return new EntryIteratorFacade<O, R>(set.iterator(), factory);
+    }
+
+    public Object[] toArray() {
+        factory.processQueue();
+        Collection<O> result = new ArrayList<O>(set.size());
+        Iterator<O> i = iterator();
+        while (i.hasNext()) {
+            result.add(i.next());
+        }
+        return result.toArray();
+    }
+
+    public boolean remove(Object o) {
+        factory.processQueue();
+        // cast is checked by factory.
+        R entryRef = factory.asReference((O) o);
+        if (entryRef == null) {
+            return false;
+        }
+        return set.remove(entryRef);
+    }
+
+    private Collection<R> asReferenceCollection(Collection<?> c) {
+        int l = c == null ? 0 : c.size();
+        Collection<R> refcoll = new ArrayList<R>(l);
+        if (l > 0) {
+            Iterator<?> i = c.iterator();
+            while (i.hasNext()) {
+                // Factory checks instanceof, we don't have the knowledge here.
+                @SuppressWarnings(value = "unchecked")
+                R ref = factory.asReference((O) i.next());
+                if (ref != null) {
+                    refcoll.add(ref);
+                }
+            }
+        }
+        return refcoll;
+    }
+
+    public boolean containsAll(Collection<?> c) {
+        return set.containsAll(asReferenceCollection(c));
+    }
+
+    public boolean retainAll(Collection<?> c) {
+        return set.retainAll(asReferenceCollection(c));
+    }
+
+    public boolean removeAll(Collection<?> c) {
+        return set.removeAll(asReferenceCollection(c));
+    }
+
+    public void clear() {
+        set.clear();
+    }
+
+    @SuppressWarnings(value = "unchecked")
+    public <O> O[] toArray(O[] a) {
+        a = set.toArray(a);
+        int l = a.length;
+        for (int i = 0; i < l; i++) {
+            a[i] = (O) factory.asFacade((R) a[i]);
+        }
+        return a;
+    }
+
+    public boolean add(O e) {
+        throw new UnsupportedOperationException("Unsupported by most concurrent collection implementations.");
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends O> c) {
+        throw new UnsupportedOperationException("Unsupported by most concurrent collection implementations.");
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/RC.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/RC.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/RC.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/RC.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.Comparable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ListIterator;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ * <p>
+ * This class contains a number of static methods for using and abstracting
+ * References in Collections.  Interfaces from the Java Collections Framework
+ * are supported.
+ * </p><p>
+ * Referents in these collections may implement {@link Comparable} or
+ * a {@link Comparator} may be used for sorting.  When Comparator's are utilised,
+ * they must first be encapsulated {@link RC#comparator(java.util.Comparator) },
+ * before passing to a constructor for your preferred underlying Collection
+ * implementation.
+ * </p><p>
+ * {@link Comparable} is not supported for IDENTITY == referenced Collections, 
+ * in this case a Comparator must be used.  
+ * </p><p>
+ * All other references support {@link Comparable}, if the referent Object
+ * doesn't implement {@link Comparable}, then {@link Reference#hashCode()} is used
+ * for sorting.  If two referent Objects have identical hashCodes,
+ * but are unequal and do not implement {@link Comparable}, their references
+ * will also have identical hashCodes, so only one of the referents can
+ * be added to a {@link SortedSet} or {@link SortedMap}.  This can be fixed by using a
+ * {@link Comparator}.
+ * </p><p>
+ * For all intents and purposes these utilities behave the same as your preferred
+ * underlying {@link Collection} implementation, with the exception of 
+ * {@link Reference} reachability.  An Object or Key,Value entry is removed 
+ * from a {@link Collection} or {@link Map}, upon becoming eligible for 
+ * garbage collection.
+ * </p><p>
+ * Synchronisation must be implemented by your preferred {@link Collection}
+ * and cannot be performed externally to the returned {@link Collection}.  
+ * Your chosen underlying {@link Collection} must also be mutable.  
+ * Objects will be removed automatically from underlying Collections when 
+ * they are eligible for garbage collection, this breaks external synchronisation.
+ * {@link CollectionsConcurrent#multiReadCollection(java.util.Collection)} may
+ * be useful for synchronising your chosen underlying {@link Collection}, 
+ * especially if Objects are not being garbage collected often and writes
+ * are minimal. 
+ * </p><p>  
+ * An Unmodifiable wrapper {@link Collections#unmodifiableCollection(java.util.Collection)}
+ * may be used externally to prevent additions to the underlying Collections,
+ * referents will still be removed as they become unreachable however.
+ * </p><p>
+ * Note that any Sub List, Sub Set or Sub Map obtained by any of the Java
+ * Collections Framework interfaces, must be views of the underlying
+ * Collection, if the Collection uses defensive copies instead of views, 
+ * References could potentially remain in one copy after garbage collection, 
+ * causing null returns.  If using standard Java Collections Framework 
+ * implementations, these problems don't occur as all Sub Lists, 
+ * Sub Sets or Sub Maps are views only.
+ * </p><p>
+ * {@link Map#entrySet() } view instances returned preserve your chosen reference
+ * behaviour, they do not support {@link Set#add(java.lang.Object)} or 
+ * {@link Set#addAll(java.util.Collection)} methods, which throw
+ * an {@link UnsupportedOperationException}.  All other methods are fully implemented
+ * and supported.
+ * </p><p>
+ * {@link Entry} view instances returned by these methods preserve reference
+ * behaviour, all methods are fully implemented and supported.
+ * </p><p>
+ * {@link Set} and it's sub interfaces {@link SortedSet} and 
+ * {@link NavigableSet}, return views that preserve reference behaviour, 
+ * all methods are fully implemented and supported.
+ * </p><p>
+ * {@link Map} and it's sub interfaces {@link SortedMap}, {@link NavigableMap},
+ * {@link ConcurrentMap} and {@link ConcurrentNavigableMap} return
+ * views that preserve reference behaviour, all methods are fully implemented 
+ * and supported.
+ * </p><p>      
+ * {@link List} returns views that preserve reference behaviour, all methods are 
+ * fully implemented and supported.
+ * </p><p>
+ * {@link Queue} and it's sub interfaces {@link Deque}, {@link BlockingQueue} and
+ * {@link BlockingDeque} return views that preserve reference behaviour, 
+ * all methods are fully implemented and supported.
+ * </p><p>
+ * {@link Iterator} and {@link ListIterator} views preserve reference behaviour, all methods
+ * are fully implemented and supported.
+ * </p><p>
+ * RC stands for Reference Collection and is abbreviated due to the length of
+ * generic parameter arguments typically required.
+ * </p>
+ * @see Ref
+ * @see Reference
+ * @author Peter Firmstone.
+ */
+public class RC {
+    private RC(){} // Non instantiable
+    
+    /**
+     * When using a Comparator in SortedSet's and SortedMap's, the Comparator
+     * must be encapsulated using this method, to order the Set or Map 
+     * by referents and not References.
+     * 
+     * @param <T>
+     * @param comparator
+     * @return
+     */
+    public static <T> Comparator<Reference<T>> comparator(Comparator<T> comparator){
+        return new ReferenceComparator<T>(comparator);
+    }
+    
+    /**
+     * Wrap a Collection for holding references so it appears as a Collection
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> Collection<T> collection(Collection<Reference<T>> internal, Ref type){
+        return new ReferenceCollection<T>(internal, type);
+            }
+            
+    /**
+     * Wrap a List for holding references so it appears as a List
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> List<T> list(List<Reference<T>> internal, Ref type){
+        return new ReferenceList<T>(internal, type);
+    }   
+    
+    /** 
+     * Wrap a Set for holding references so it appears as a Set
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> Set<T> set(Set<Reference<T>> internal, Ref type){
+        return new ReferenceSet<T>(internal, type);
+    }
+    /**
+     * Wrap a SortedSet for holding references so it appears as a SortedSet
+     * containing referents.
+     * 
+     * @para        m <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> SortedSet<T> sortedSet(
+            SortedSet<Reference<T>> internal, Ref type){
+        return new ReferenceSortedSet<T>(internal, type);
+    }
+    /**
+     * Wrap a NavigableSet for holding references so it appears as a NavigableSet
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> NavigableSet<T> navigableSet(
+            NavigableSet<Reference<T>> internal, Ref type){
+        return new ReferenceNavigableSet<T>(internal, type);
+    }
+    /**
+     * Wrap a Queue for holding references so it appears as a Queue
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> Queue<T> queue(Queue<Reference<T>> internal, Ref type){
+        return new ReferencedQueue<T>(internal, type);
+    }
+    /**
+     * Wrap a Deque for holding references so it appears as a Deque
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> Deque<T> deque(Deque<Reference<T>> internal, Ref type){
+        return new ReferenceDeque<T>(internal, type);
+    }
+    /**
+     * Wrap a BlockingQueue for holding references so it appears as a BlockingQueue
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> BlockingQueue<T> blockingQueue(
+            BlockingQueue<Reference<T>> internal, Ref type){
+        return new ReferenceBlockingQueue<T>(internal, type);
+    }
+    /**
+     * Wrap a Blocki    ngDeque for holding references so it appears as a BlockingDeque
+     * containing referents.
+     * 
+     * @param <T>
+     * @param internal
+     * @param type
+     * @return
+     */
+    public static <T> BlockingDeque<T> blockingDeque(
+            BlockingDeque<Reference<T>> internal, Ref type){
+        return new ReferenceBlockingDeque<T>(internal, type);
+    }
+    /**
+     * Wrap a Map for holding references so it appears as a Map
+     * containing referents.
+     * 
+     * @param <K>
+     * @param <V>
+     * @param internal
+     * @param key
+     * @param value
+     * @return
+     */
+    public static <K, V> Map<K, V> map(
+            Map<Reference<K>, Reference<V>> internal, Ref key, Ref value){
+        return new ReferenceMap<K, V>(internal, key, value);
+    }
+    /**
+     * Wrap a SortedMap for holding references so it appears as a SortedMap
+     * containing referents.
+     * 
+     * @param <K>
+     * @param <V>
+     * @param internal
+     * @param key
+     * @param value
+     * @return
+     */
+    public static <K, V> SortedMap<K, V> sortedMap(
+            SortedMap<Reference<K>, Reference<V>> internal, Ref key, Ref value){
+        return new ReferenceSortedMap<K, V>(internal, key, value);
+    }
+    /**
+     * Wrap a NavigableMap for holding references so it appears as a NavigableMap
+     * containing referents.
+     * 
+     * @param <K>
+     * @param <V>
+     * @param internal
+     * @param key
+     * @param value
+     * @return
+     */
+    public static <K, V> NavigableMap<K, V> navigableMap(
+            NavigableMap<Reference<K>, Reference<V>> internal, Ref key, Ref value){
+        return new ReferenceNavigableMap<K, V>(internal, key, value);
+    }
+    /**
+     * Wrap a ConcurrentMap for holding references so it appears as a ConcurrentMap
+     * containing referents.
+     * 
+     * @param <K> - key type.
+     * @param <V> - value type.
+     * @param internal - for holding references.
+     * @param key - key reference type.
+     * @param value - value reference type.
+     * @return
+     */
+    public static <K, V> ConcurrentMap<K, V> concurrentMap(
+            ConcurrentMap<Reference<K>, Reference<V>> internal, Ref key, Ref value){
+        return new ReferenceConcurrentMap<K, V>(internal, key, value);
+    }
+    
+    /**
+     * Wrap a ConcurrentNavigableMap for holding references so it appears as a 
+     * ConcurrentNavigableMap containing referents.
+     * 
+     * @param <K>
+     * @param <V>
+     * @param internal
+     * @param key
+     * @param value
+     * @return
+     */
+    public static <K, V> ConcurrentNavigableMap<K, V> concurrentNavigableMap(
+            ConcurrentNavigableMap<Reference<K>, Reference<V>> internal, Ref key, Ref value){
+        return new ReferenceConcurrentNavigableMap<K, V>(internal, key, value);
+    }
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/RC.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/Ref.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/Ref.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/Ref.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/Ref.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+/**
+ * Ref enum represents the types of references provided by the jvm, it also
+ * defines equals contract behaviour for Reference implementations suitable
+ * for use in Collections.
+ * 
+ * It is recommended to use STRONG or IDENTITY based references as keys
+ * in Maps, due to the unpredictability that occurs, when an equal
+ * Reference Key disappears due to garbage collection.
+ * 
+ * Map implementations must delete their key -> value mapping when either the 
+ * key or value References become unreachable.
+ * 
+ * Object.toString() is overridden in the reference implementations to return
+ * toString() of the referent, if it is still reachable, otherwise the reference
+ * calls their superclass toString() method, where superclass is a java 
+ * Reference subclass.
+ * 
+ * Phantom references are not used, they are designed to replace
+ * {@link Object#finalize() } and remain unreachable, but not garbage collected until
+ * the {@link PhantomReference} also becomes unreachable, get() always returns
+ * null.
+ *
+ * @see Reference
+ * @see WeakReference
+ * @see SoftReference
+ * @see PhantomReference
+ * @see Map
+ * @see ConcurrentMap
+ * @see List
+ * @see Set
+ * @see Collection
+ * @see Comparable
+ * @author Peter Firmstone.
+ */
+public enum Ref {
+    /**
+     * SOFT References implement equals based on equality of the referent 
+     * objects, while the referent is still reachable.  The hashCode
+     * implementation is based on the referent implementation of hashCode,
+     * while the referent is reachable.
+     * 
+     * After garbage collection, Reference equality is based
+     * on the original identity of the referents using System.identityHashCode().
+     * 
+     * Because {@link System#identityHashCode(java.lang.Object)} is not unique, 
+     * the referents Class.hashCode() is also used to calculate the hashCode, 
+     * generated during Reference construction.
+     * 
+     * SOFT References implement Comparable allowing the referent Objects
+     * to be compared if they implement Comparable.  If the referent Object
+     * doesn't implement Comparable, the hashCode's of the Reference is 
+     * compared instead.  If the referent Objects don't implement Comparable,
+     * then they shouldn't really be used in sorted collections.
+     * 
+     * Garbage collection must be the same as SoftReference.
+     * @see SoftReference
+     * @see WeakReference
+     * @see Comparable
+     */
+    SOFT,
+    /**
+     * SOFT_IDENTY References implement equals based on identity == of the
+     * referent objects.
+     * 
+     * Garbage collection must be the same as SoftReference.
+     * @see SoftReference
+     */
+    SOFT_IDENTITY,
+    /**
+     * WEAK References implement equals based on equality of the referent 
+     * objects, while the referent is still reachable.  The hashCode
+     * implementation is based on the referent implementation of hashCode,
+     * while the referent is reachable.
+     * 
+     * After garbage collection, Reference equality is based
+     * on the original identity of the referents using System.identityHashCode().
+     * 
+     * Because System.identityHashCode() is not unique, the referents
+     * Class.hashCode() is also used to calculate the hashCode, generated during
+     * Reference construction.
+     * 
+     * WEAK References implement comparable allowing the referent Objects
+     * to be compared if they implement Comparable.  If the referent Object
+     * doesn't implement Comparable, the hashCode's of the Reference is 
+     * compared instead.  If the referent Object's don't implement Comparable,
+     * then they shouldn't really be used in sorted collections.
+     * 
+     * Garbage collection must be the same as WeakReference.
+     * @see WeakReference
+     * @see Comparable
+     */
+    WEAK,
+    /**
+     * WEAK_IDENTY References implement equals based on identity == of the
+     * referent objects.
+     * 
+     * Garbage collection must be the same as WeakReference.
+     * 
+     * @see WeakReference
+     */
+    WEAK_IDENTITY,
+    /**
+     * STRONG References implement equals and hashCode() based on the 
+     * equality of the underlying Object.
+     * 
+     * STRONG References implement Comparable allowing the referent Objects
+     * to be compared if they implement Comparable.  If the referent Object
+     * doesn't implement Comparable, the hashCode's of the Reference is 
+     * compared instead.  If the referent Object's don't implement Comparable,
+     * then they shouldn't really be used in sorted collections.
+     * 
+     * Garbage collection doesn't occur until the Reference is cleared.
+     * @see Comparable
+     */
+    STRONG
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/Ref.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+class ReferenceBlockingDeque<T> extends ReferenceDeque<T> implements BlockingDeque<T>{
+    
+    private final BlockingDeque<Reference<T>> deque;
+    private final Ref type;
+    
+    ReferenceBlockingDeque(BlockingDeque<Reference<T>> deque, Ref type){
+        super(deque, type);
+        this.deque = deque;
+        this.type = type;
+    }
+
+
+    public void putFirst(T e) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        deque.putFirst(r);
+    }
+
+
+    public void putLast(T e) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        deque.putLast(r);
+    }
+
+
+    public boolean offerFirst(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        return deque.offerFirst(r, timeout, unit);
+    }
+
+
+    public boolean offerLast(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        return deque.offerLast(r, timeout, unit);
+    }
+
+
+    public T takeFirst() throws InterruptedException {
+        processQueue();
+        return deque.takeFirst().get();
+    }
+
+
+    public T takeLast() throws InterruptedException {
+        processQueue();
+        return deque.takeLast().get();
+    }
+
+
+    public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        return deque.pollFirst(timeout, unit).get();
+    }
+
+
+    public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        return deque.pollLast(timeout, unit).get();
+    }
+
+
+    public void put(T e) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        deque.put(r);
+    }
+
+
+    public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        return deque.offer(r,timeout, unit);
+    }
+
+
+    public T take() throws InterruptedException {
+        processQueue();
+        return deque.take().get();
+    }
+
+
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        return deque.poll(timeout, unit).get();
+    }
+
+
+    public int remainingCapacity() {
+        return deque.remainingCapacity();
+    }
+
+
+    public int drainTo(Collection<? super T> c) {
+        processQueue();
+        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(deque.size());
+        int count = deque.drainTo(drain);
+        Iterator<Reference<T>> i = drain.iterator();
+        while (i.hasNext()){
+            c.add(i.next().get());
+        }
+        return count;    
+    }
+
+
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        processQueue();
+        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(maxElements);
+        int count = deque.drainTo(drain, maxElements);
+        Iterator<Reference<T>> i = drain.iterator();
+        while (i.hasNext()){
+            c.add(i.next().get());
+        }
+        return count;
+    }
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @param <T> 
+ * @author Peter Firmstone.
+ */
+class ReferenceBlockingQueue<T> extends ReferencedQueue<T> implements BlockingQueue<T> {
+    private final BlockingQueue<Reference<T>> queue;
+    private final Ref type;
+    
+    ReferenceBlockingQueue(BlockingQueue<Reference<T>> queue, Ref type){
+        super(queue, type);
+        this.queue = queue;
+        this.type = type;
+    }
+
+    public void put(T e) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        queue.put(r);
+    }
+
+    public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        return queue.offer(r, timeout, unit);
+    }
+
+    public T take() throws InterruptedException {
+        processQueue();
+        return queue.take().get();
+    }
+
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        return queue.poll(timeout, unit).get();
+    }
+
+    public int remainingCapacity() {
+        processQueue();
+        return queue.remainingCapacity();
+    }
+
+    public int drainTo(Collection<? super T> c) {
+        processQueue();
+        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(queue.size());
+        int count = queue.drainTo(drain);
+        Iterator<Reference<T>> i = drain.iterator();
+        while (i.hasNext()){
+            c.add(i.next().get());
+        }
+        return count;    
+    }
+
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        processQueue();
+        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(maxElements);
+        int count = queue.drainTo(drain, maxElements);
+        Iterator<Reference<T>> i = drain.iterator();
+        while (i.hasNext()){
+            c.add(i.next().get());
+        }
+        return count;
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A Collection of Reference Objects, the developer may chose any Collection
+ * implementation to store the References, which is passed in a runtime.
+ * 
+ * The underlying Collection implementation governs the specific behaviour of this
+ * Collection.
+ * 
+ * Synchronisation must be implemented by the underlying Collection and cannot
+ * be performed externally to this class.  The underlying Collection must
+ * also be mutable.  Object will be removed automatically from the underlying
+ * Collection when they are eligible for garbage collection.
+ * 
+ * Weak, Weak Identity, Soft, Soft Identity or Strong references may be used.
+ * This Collection may be used as an Object pool cache or any other purpose 
+ * that requires unique memory handling.
+ * 
+ * For concurrent threads, it is recommended to encapsulate the underlying
+ * collection in a multi read, single write collection for scalability.
+ * 
+ * @see Ref
+ * @see ConcurrentCollections#multiReadCollection(java.util.Collection) 
+ * @author Peter Firmstone.
+ */
+class ReferenceCollection<T> implements Collection<T>{
+    private Collection<Reference<T>> col;
+    private Ref type;
+    private ReferenceQueue<T> queue;
+    
+    ReferenceCollection(Collection<Reference<T>> col, Ref type){
+        this.col = col;
+        this.type = type;
+        queue = new ReferenceQueue<T>(); 
+    }
+    
+    ReferenceCollection(Collection<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
+        this.col = col;
+        this.type = type;
+        this.queue = queue;
+    }
+    
+    void processQueue(){
+        Object t = null;
+        while ( (t = queue.poll()) != null){
+            col.remove(t);
+        }
+    }
+    
+    ReferenceQueue<T> getQueue(){
+        return queue;
+    }
+    
+    Ref getType(){
+        return type;
+    }
+    
+    Reference<T> wrapObj(T t, boolean enqueue){
+        return ReferenceFactory.create(t, enqueue == true ? queue: null , type);
+    }
+    
+    Collection<Reference<T>> wrapColl( Collection<? extends T> c, boolean enqueue){
+        Collection<Reference<T>> refc = new ArrayList<Reference<T>>(c.size());
+        Iterator<? extends T> i = c.iterator();
+        while (i.hasNext()){
+            refc.add(wrapObj(i.next(), false));
+        }
+        return refc;
+    }
+
+    public int size() {
+        processQueue();
+        return col.size();
+    }
+
+    public boolean isEmpty() {
+        processQueue();
+        return col.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+        processQueue();
+        return col.contains(wrapObj((T) o, false));
+    }
+    
+    /**
+     * This Iterator may return null values if garbage collection
+     * runs during iteration.
+     * 
+     * Always check for null values.
+     * 
+     * @return T - possibly null.
+     */
+    public Iterator<T> iterator() {
+        processQueue();
+        return new ReferenceIterator<T>(col.iterator());
+    }
+
+    public Object[] toArray() {
+        processQueue();
+        int l = col.size();
+        Object[] array = col.toArray(new Object[l]);
+        for (int i = 0; i < l; i++){
+            array[i] = ((Reference)array[i]).get();
+        }
+        return array;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T[] toArray(T[] a) {
+        processQueue();
+        int l = col.size();
+        Reference<T>[] result = col.toArray( new Reference[col.size()]);
+        if ( a.length < result.length) a = (T[]) new Object[result.length];
+        for (int i = 0; i < l; i++){
+            a[i] =  (T) result[i].get();
+        }
+        return a;
+    }
+
+    public boolean add(T e) {
+        processQueue();
+        return col.add(wrapObj(e, true));
+    }
+
+    public boolean remove(Object o) {
+        processQueue();
+        return col.remove(wrapObj((T) o, false));
+    }
+
+    public boolean containsAll(Collection<?> c) {
+        processQueue();
+        return col.containsAll(wrapColl((Collection<T>) c, false));
+    }
+
+    public boolean addAll(Collection<? extends T> c) {
+        processQueue();
+        return col.addAll(wrapColl((Collection<T>) c, true));
+    }
+
+    public boolean removeAll(Collection<?> c) {
+        processQueue();
+        return col.removeAll(wrapColl((Collection<T>) c, false));
+    }
+
+    public boolean retainAll(Collection<?> c) {
+        processQueue();
+        return col.retainAll(wrapColl((Collection<T>) c, false));
+    }
+
+    public void clear() {
+        col.clear();
+    }
+    
+    /*
+     * The next three methods are suitable implementations for subclasses also.
+     */
+    public String toString(){
+        return col.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 73 * hash + (this.col != null ? this.col.hashCode() : 0);
+        hash = 73 * hash + (this.type != null ? this.type.hashCode() : 0);
+        hash = 73 * hash + (this.getClass().hashCode());
+        return hash;
+    }
+    
+    @Override
+    public boolean equals(Object o){
+        if ( o == this ) return true;
+        if ( hashCode() != o.hashCode()) return false;
+        if (!(this.getClass().equals(o.getClass()))) return false;
+        if (!( o instanceof ReferenceCollection)) return false;
+        ReferenceCollection that = (ReferenceCollection) o;
+        if ( type.equals(that.type) && col.equals(that.col)) return true;
+        return false;
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.Comparator;
+
+/**
+ *
+ * @param <T> 
+ * @author Peter Firmstone.
+ */
+class ReferenceComparator<T> implements Comparator<Reference<T>> {
+    private final Comparator<T> comparator;
+    ReferenceComparator(Comparator<T> comparator){
+        this.comparator = comparator;
+    }
+
+    public int compare(Reference<T> o1, Reference<T> o2) {
+        return comparator.compare(o1.get(), o2.get());
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 61 * hash + (this.comparator != null ? this.comparator.hashCode() : 0);
+        return hash;
+    }
+    
+    @Override
+    public boolean equals(Object o){
+        if (o == this) return true;
+        if (o instanceof ReferenceComparator){
+            return (comparator.equals(((ReferenceComparator)o).comparator));
+        }
+        return false;
+    }
+    
+    Comparator<? super T> get(){
+        return comparator;
+    }
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A referenced hash map, that encapsulates and utilises any ConcurrentMap
+ * implementation passed in at construction.
+ * 
+ * Based on any ConcurrentMap implementation, it doesn't accept null keys or values.
+ *
+ * It is recommended although not mandatory to use identity based References for keys,
+ * unexpected results occur when relying on equal keys, if one key is no longer 
+ * strongly reachable and has been garbage collected and removed from the 
+ * Map.
+ * 
+ * 
+ * 
+ * If either a key or value, is no longer strongly reachable, their mapping
+ * will be queued for removal and garbage collection, in compliance with
+ * the Reference implementation selected.
+ *
+ * @param <K> 
+ * @param <V> 
+ * @see Ref
+ * @author Peter Firmstone.
+ *
+ * @since 2.3
+ */
+class ReferenceConcurrentMap<K, V> extends ReferenceMap<K, V> implements ConcurrentMap<K, V> {
+
+    
+    // ConcurrentMap must be protected from null values?  It changes it's behaviour, is that a problem?
+    private final ConcurrentMap<Reference<K>, Reference<V>> map;
+    
+    ReferenceConcurrentMap(ConcurrentMap<Reference<K>,Reference<V>> map, Ref key, Ref val){
+        super (map, key, val);
+        this.map = map;
+    }
+    
+    ReferenceConcurrentMap(ConcurrentMap<Reference<K>,Reference<V>> map,
+            Ref key, Ref val, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
+        super (map, key, val, keyQueue, valQueue);
+        this.map = map;
+    }
+    
+    public V putIfAbsent(K key, V value) {
+        processQueue();  //may be a slight delay before atomic putIfAbsent
+        if (key == null || value == null) throw new NullPointerException("null key or value not allowed");
+        Reference<K> k = wrapKey(key, true);
+        Reference<V> v = wrapVal(value, true);
+        Reference<V> val = map.putIfAbsent(k, v);
+        if ( val != null ) return val.get();
+        return null;
+    }
+
+    public boolean remove(Object key, Object value) {
+        processQueue();
+        if (key == null || value == null) return false;
+        @SuppressWarnings("unchecked")
+        Reference<K> k = wrapKey( (K) key, false);
+        @SuppressWarnings("unchecked")
+        Reference<V> v = wrapVal( (V) value, false);
+        return map.remove(k, v);
+    }
+
+    public boolean replace(K key, V oldValue, V newValue) {
+        processQueue();
+        if (key == null || oldValue == null || newValue == null) return false;
+        Reference<K> k = wrapKey(key, true);
+        Reference<V> vOld = wrapVal(oldValue, false);
+        Reference<V> vNew = wrapVal(newValue, true);
+        return map.replace(k, vOld, vNew);
+    }
+
+    public V replace(K key, V value) {
+        processQueue();
+        if (key == null || value == null) throw new NullPointerException("null key or value not allowed");
+        Reference<K> k = wrapKey(key, true);
+        Reference<V> v = wrapVal(value, true);
+        Reference<V> val = map.replace(k, v);
+        if ( val != null ) return val.get();
+        return null;
+    }
+    
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ *
+ * @param <K> 
+ * @param <V> 
+ * @author Peter Firmstone.
+ */
+class ReferenceConcurrentNavigableMap<K,V> 
+extends ReferenceConcurrentMap<K,V> implements ConcurrentNavigableMap<K,V>{
+    private ConcurrentNavigableMap<Reference<K>,Reference<V>> map;
+    private Ref keyRef;
+    private Ref valRef;
+    
+    ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
+        super(map, keyRef, valRef);
+        this.map = map;
+        this.keyRef = keyRef;
+        this.valRef = valRef;
+    }
+
+    ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Reference<K>, Reference<V>> map,
+            Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
+        super(map, keyRef, valRef, keyQueue, valQueue);
+        this.map = map;
+        this.keyRef = keyRef;
+        this.valRef = valRef;
+    }
+    public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.subMap(
+                wrapKey(fromKey, false), 
+                fromInclusive, 
+                wrapKey(toKey, false),
+                toInclusive
+            ), 
+            keyRef, 
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.headMap(wrapKey(toKey, false), inclusive),
+            keyRef,
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.tailMap(wrapKey(fromKey, false), inclusive),
+            keyRef,
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.subMap(wrapKey(fromKey, false), wrapKey(toKey, false)),
+            keyRef,
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public ConcurrentNavigableMap<K, V> headMap(K toKey) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.headMap(wrapKey(toKey, false)),
+            keyRef,
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.tailMap(wrapKey(fromKey, false)),
+            keyRef,
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public ConcurrentNavigableMap<K, V> descendingMap() {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.descendingMap(),
+            keyRef,
+            valRef,
+            getKeyQueue(),
+            getValQueue()
+        );
+    }
+
+
+    public NavigableSet<K> navigableKeySet() {
+        processQueue();
+        return new ReferenceNavigableSet<K>(map.navigableKeySet(), keyRef, getKeyQueue());
+    }
+
+
+    public NavigableSet<K> keySet() {
+        processQueue();
+        return new ReferenceNavigableSet<K>(map.keySet(), keyRef, getKeyQueue());
+    }
+
+
+    public NavigableSet<K> descendingKeySet() {
+        processQueue();
+        return new ReferenceNavigableSet<K>(map.descendingKeySet(), keyRef, getKeyQueue());
+    }
+
+
+    public Entry<K, V> lowerEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.lowerEntry(wrapKey(key, false)),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public K lowerKey(K key) {
+        processQueue();
+        return map.lowerKey(wrapKey(key, false)).get();
+    }
+
+
+    public Entry<K, V> floorEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.floorEntry(wrapKey(key, false)),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public K floorKey(K key) {
+        processQueue();
+        return map.floorKey(wrapKey(key, false)).get();
+    }
+
+
+    public Entry<K, V> ceilingEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.ceilingEntry(wrapKey(key, false)),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public K ceilingKey(K key) {
+        processQueue();
+        return map.ceilingKey(wrapKey(key, false)).get();
+    }
+
+
+    public Entry<K, V> higherEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.higherEntry(wrapKey(key, false)),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public K higherKey(K key) {
+        processQueue();
+        return map.higherKey(wrapKey(key, false)).get();
+    }
+
+
+    public Entry<K, V> firstEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.firstEntry(),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public Entry<K, V> lastEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.lastEntry(),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public Entry<K, V> pollFirstEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.pollFirstEntry(),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+
+    public Entry<K, V> pollLastEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.pollLastEntry(),
+            valRef, 
+            getValQueue()
+        );
+    }
+
+    
+    @SuppressWarnings("unchecked")
+    public Comparator<? super K> comparator() {
+        processQueue();
+        Comparator<? super Reference<K>> c = map.comparator();
+        if ( c instanceof ReferenceComparator){
+            return ((ReferenceComparator) c).get();
+        }
+        return null;
+    }
+
+
+    public K firstKey() {
+        processQueue();
+        return map.firstKey().get();
+    }
+
+
+    public K lastKey() {
+        processQueue();
+        return map.lastKey().get();
+    }
+
+
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.Deque;
+import java.util.Iterator;
+
+/**
+ *
+ * @author Peter Firmstone
+ */
+class ReferenceDeque<T> extends ReferencedQueue<T> implements Deque<T>{
+    private final Deque<Reference<T>> deque;
+    ReferenceDeque(Deque<Reference<T>> deque, Ref type){
+        super(deque, type);
+        this.deque = deque;
+    }
+    public void addFirst(T e) {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        deque.addFirst(r);
+    }
+
+    public void addLast(T e) {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        deque.addLast(r);
+    }
+
+    public boolean offerFirst(T e) {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        return deque.offerFirst(r);
+    }
+
+    public boolean offerLast(T e) {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        return deque.offerLast(r);
+    }
+
+    public T removeFirst() {
+        processQueue();
+        return deque.removeFirst().get();
+    }
+
+    public T removeLast() {
+        processQueue();
+        return deque.removeLast().get();
+    }
+
+    public T pollFirst() {
+        processQueue();
+        return deque.pollFirst().get();
+    }
+
+    public T pollLast() {
+        processQueue();
+        return deque.pollLast().get();
+    }
+
+    public T getFirst() {
+        processQueue();
+        return deque.getFirst().get();
+    }
+
+    public T getLast() {
+        processQueue();
+        return deque.getLast().get();
+    }
+
+    public T peekFirst() {
+        processQueue();
+        return deque.peekFirst().get();
+    }
+
+    public T peekLast() {
+        processQueue();
+        return deque.peekLast().get();
+    }
+
+    public boolean removeFirstOccurrence(Object o) {
+        processQueue();
+        @SuppressWarnings("unchecked")
+        Reference<T> r = wrapObj((T) o, false);
+        return deque.removeFirstOccurrence(r);
+    }
+
+    public boolean removeLastOccurrence(Object o) {
+        processQueue();
+        @SuppressWarnings("unchecked")
+        Reference<T> r = wrapObj((T) o, false);
+        return deque.removeLastOccurrence(r);
+    }
+
+    public void push(T e) {
+        processQueue();
+        Reference<T> r = wrapObj(e, true);
+        deque.push(r);
+    }
+
+    public T pop() {
+        processQueue();
+        return deque.pop().get();
+    }
+
+    public Iterator<T> descendingIterator() {
+        return new ReferenceIterator<T>(deque.descendingIterator());
+    }
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.river.api.util.Facade;
+
+/**
+ * 
+ * @author Peter Firmstone.
+ */
+class ReferenceEntryFacade<K, V> implements Map.Entry<K, V>, Facade<Entry<Reference<K>,Reference<V>>> {
+    private final Map.Entry<Reference<K>, Reference<V>> entry;
+    private final ReferenceQueue<V> queue;
+    private final Ref valRef;
+
+    ReferenceEntryFacade(Map.Entry<Reference<K>, Reference<V>> entry,
+                                        Ref valRef, ReferenceQueue<V> queue) {
+        this.entry = entry;
+        this.queue = queue;
+        this.valRef = valRef;
+    }
+    
+    public K getKey() {
+        return entry.getKey().get();
+    }
+
+    public V getValue() {
+        return entry.getValue().get();
+    }
+
+    public V setValue(V value) {
+        return entry.setValue(wrapVal(value, true)).get();
+    }
+
+    private Reference<V> wrapVal(V val, boolean enque) {
+        return ReferenceFactory.create(val, enque == true ? queue : null, valRef);
+    }
+
+    public Entry<Reference<K>, Reference<V>> reveal() {
+        return entry;
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceFactory.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceFactory.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceFactory.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceFactory.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+
+/**
+ * ReferenceFactory creates References, representing the various subclasses
+ * of Reference, such as WEAK, SOFT and STRONG for
+ * use in collections.
+ * 
+ * These subtypes override equals and hashcode, so that References may be used
+ * in Collections, Maps, Sets and Lists to represent their referent.
+ * 
+ * @see Reference
+ * @see Ref
+ * @author Peter Firmstone.
+ */
+class ReferenceFactory<T> {
+    
+    // Non instantiable.
+    private ReferenceFactory(){}
+    
+    static <T> Reference<T> create(T t, ReferenceQueue<? super T> queue, Ref type ){
+        switch (type){
+            case WEAK_IDENTITY: return new WeakIdentityReferenceKey<T>(t, queue);
+            case SOFT_IDENTITY: return new SoftIdentityReferenceKey<T>(t, queue);
+            case WEAK: return new WeakReferenceKey<T>(t, queue);
+            case SOFT: return new SoftReferenceKey<T>(t, queue);
+            default: return new StrongReferenceKey<T>(t, queue);
+        }
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java?rev=1173698&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java Wed Sep 21 15:02:02 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.Iterator;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+class ReferenceIterator<T> implements Iterator<T> {
+    private final Iterator<Reference<T>> iterator;
+
+    ReferenceIterator(Iterator<Reference<T>> iterator) {
+        this.iterator = iterator;
+    }
+
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    public T next() {
+        return iterator.next().get();
+    }
+
+    public void remove() {
+        iterator.remove();
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native