You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2011/10/03 10:24:24 UTC

svn commit: r1178329 [1/2] - in /river/jtsk/skunk/peterConcurrentPolicy: src/org/apache/river/impl/util/ test/src/org/apache/river/impl/util/

Author: peter_firmstone
Date: Mon Oct  3 08:24:23 2011
New Revision: 1178329

URL: http://svn.apache.org/viewvc?rev=1178329&view=rev
Log:
Reference Collection utilities null reference checks and non blocking concurrency

Added:
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java   (with props)
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java   (with props)
Modified:
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/SoftIdentityReferenceKey.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/SoftReferenceKey.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/WeakIdentityReferenceKey.java
    river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/WeakReferenceKey.java
    river/jtsk/skunk/peterConcurrentPolicy/test/src/org/apache/river/impl/util/ReferenceCollectionTest.java

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java?rev=1178329&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java Mon Oct  3 08:24:23 2011
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ *
+ * @author peter
+ */
+public class CollectionWrapper<T> extends AbstractCollection<Reference<T>> implements Collection<Reference<T>> {
+    private final Collection<T> col;
+    private final ReferenceQueuingFactory<T, Reference<T>> rqf;
+    private final boolean enque;
+    
+    CollectionWrapper(Collection<T> col, ReferenceQueuingFactory<T, Reference<T>> rqf, boolean enque){
+        this.col = col;
+        this.rqf = rqf;
+        this.enque = enque;
+    }
+
+    @Override
+    public Iterator<Reference<T>> iterator() {
+        return new Iter<T>(col.iterator(), rqf);
+    }
+
+    @Override
+    public int size() {
+        return col.size();
+    }
+    
+    public boolean add(Reference<T> t) {
+	return col.add( t != null ? t.get() : null );
+    }
+    
+    private class Iter<T> implements Iterator<Reference<T>> {
+        Iterator<T> iterator;
+        private final ReferenceQueuingFactory<T, Reference<T>> rqf;
+        Iter(Iterator<T> it, ReferenceQueuingFactory<T, Reference<T>> rqf){
+            iterator = it;
+            this.rqf = rqf;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public Reference<T> next() {
+            return rqf.referenced( iterator.next(), enque);
+        }
+
+        @Override
+        public void remove() {
+            iterator.remove();
+        }
+        
+    }
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java Mon Oct  3 08:24:23 2011
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.impl.util;
-
-/**
- * An interface for converting Map.Entry to disguise contained References.
- * 
- * @author Peter Firmstone
- */
-interface EntryConversionFactory<O, R> {
-
-    O asFacade(R u);
-
-    R asReference(O w, boolean enque);
-
-    void processQueue();
-    
-}

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java Mon Oct  3 08:24:23 2011
@@ -19,33 +19,38 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
 
 /**
  *
  * @author Peter Firmstone.
  */
-class EntryFacadeConverter<K,V> implements EntryConversionFactory<Entry<K,V>, Entry<Reference<K>, Reference<V>>> {
-    ReferenceMap<K,V> map;
+class EntryFacadeConverter<K,V> implements ReferenceQueuingFactory<Entry<K,V>, Entry<Reference<K>, Reference<V>>> {
+    private final ReferenceQueuingFactory<K, Reference<K>> krqf;
+    private final ReferenceQueuingFactory<V, Reference<V>> vrqf;
 
-    EntryFacadeConverter(ReferenceMap<K,V> map) {
-        this.map = map;
+
+    EntryFacadeConverter(ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf) {
+        this.krqf = krqf;
+        this.vrqf = vrqf;
     }
 
-    public Entry<K,V> asFacade(Entry<Reference<K>, Reference<V>> u) {
-        return new ReferenceEntryFacade<K, V>(u, map.getValRef(), map.getValQueue());
+    public Entry<K,V> pseudoReferent(Entry<Reference<K>, Reference<V>> u) {
+        return new ReferenceEntryFacade<K, V>(u, vrqf);
     }
 
-    public Entry<Reference<K>, Reference<V>> asReference(Entry<K,V> w, boolean enque) {
+    public Entry<Reference<K>, Reference<V>> referenced(Entry<K,V> w, boolean enque) {
+        // The entry could alread by a Reference based Entry obscured by a facade.
         return new SimpleEntry<Reference<K>, Reference<V>>(
-            map.wrapKey(w.getKey(), enque), map.wrapVal(w.getValue(), enque)
+            krqf.referenced(w.getKey(), enque), vrqf.referenced(w.getValue(), enque)
         );
         }
 
     public void processQueue() {
-        map.processQueue();
+        krqf.processQueue();
+        vrqf.processQueue();
     }
     
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java Mon Oct  3 08:24:23 2011
@@ -26,9 +26,9 @@ import java.util.Iterator;
  */
 class EntryIteratorFacade<O, R> implements Iterator<O> {
     private Iterator<R> iterator;
-    private EntryConversionFactory<O, R> wf;
+    private ReferenceQueuingFactory<O, R> wf;
 
-    EntryIteratorFacade(Iterator<R> iterator, EntryConversionFactory<O, R> wf) {
+    EntryIteratorFacade(Iterator<R> iterator, ReferenceQueuingFactory<O, R> wf) {
         this.iterator = iterator;
         this.wf = wf;
     }
@@ -38,7 +38,7 @@ class EntryIteratorFacade<O, R> implemen
     }
 
     public O next() {
-        return wf.asFacade(iterator.next());
+        return wf.pseudoReferent(iterator.next());
     }
 
     public void remove() {

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java Mon Oct  3 08:24:23 2011
@@ -36,9 +36,9 @@ import java.util.Set;
  */
 class EntrySetFacade<O, R> extends AbstractSet<O> implements Set<O> {
     private Set<R> set;
-    private EntryConversionFactory<O, R> factory;
+    private ReferenceQueuingFactory<O, R> factory;
 
-    EntrySetFacade(Set<R> set, EntryConversionFactory<O, R> wf) {
+    EntrySetFacade(Set<R> set, ReferenceQueuingFactory<O, R> wf) {
         this.set = set;
         this.factory = wf;
     }
@@ -65,6 +65,6 @@ class EntrySetFacade<O, R> extends Abstr
     public boolean add(O e) {
         factory.processQueue();
         if ( e == null ) return false;
-        return set.add(factory.asReference(e, true));
+        return set.add(factory.referenced(e, true));
     }
     }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java Mon Oct  3 08:24:23 2011
@@ -19,9 +19,7 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.TimeUnit;
 
@@ -32,12 +30,10 @@ import java.util.concurrent.TimeUnit;
 class ReferenceBlockingDeque<T> extends ReferenceDeque<T> implements BlockingDeque<T>{
     
     private final BlockingDeque<Reference<T>> deque;
-    private final Ref type;
     
     ReferenceBlockingDeque(BlockingDeque<Reference<T>> deque, Ref type){
         super(deque, type);
         this.deque = deque;
-        this.type = type;
     }
 
 
@@ -71,25 +67,33 @@ class ReferenceBlockingDeque<T> extends 
 
     public T takeFirst() throws InterruptedException {
         processQueue();
-        return deque.takeFirst().get();
+        Reference<T> t = deque.takeFirst();
+        if ( t != null ) return t.get();
+        return null;
     }
 
 
     public T takeLast() throws InterruptedException {
         processQueue();
-        return deque.takeLast().get();
+        Reference<T> t = deque.takeLast();
+        if ( t != null ) return t.get();
+        return null;
     }
 
 
     public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
         processQueue();
-        return deque.pollFirst(timeout, unit).get();
+        Reference<T> t = deque.pollFirst(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
     }
 
 
     public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
         processQueue();
-        return deque.pollLast(timeout, unit).get();
+        Reference<T> t = deque.pollLast(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
     }
 
 
@@ -109,13 +113,17 @@ class ReferenceBlockingDeque<T> extends 
 
     public T take() throws InterruptedException {
         processQueue();
-        return deque.take().get();
+        Reference<T> t = deque.take();
+        if ( t != null ) return t.get();
+        return null;
     }
 
 
     public T poll(long timeout, TimeUnit unit) throws InterruptedException {
         processQueue();
-        return deque.poll(timeout, unit).get();
+        Reference<T> t = deque.poll(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
     }
 
 
@@ -126,24 +134,20 @@ class ReferenceBlockingDeque<T> extends 
 
     public int drainTo(Collection<? super T> c) {
         processQueue();
-        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(deque.size());
-        int count = deque.drainTo(drain);
-        Iterator<Reference<T>> i = drain.iterator();
-        while (i.hasNext()){
-            c.add(i.next().get());
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Reference<T>> dr = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+        return deque.drainTo(dr);
         }
-        return count;    
-    }
 
 
     public int drainTo(Collection<? super T> c, int maxElements) {
         processQueue();
-        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(maxElements);
-        int count = deque.drainTo(drain, maxElements);
-        Iterator<Reference<T>> i = drain.iterator();
-        while (i.hasNext()){
-            c.add(i.next().get());
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Reference<T>> drain = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+        return deque.drainTo(drain, maxElements);
         }
-        return count;
     }
-}

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java Mon Oct  3 08:24:23 2011
@@ -19,9 +19,7 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -32,12 +30,10 @@ import java.util.concurrent.TimeUnit;
  */
 class ReferenceBlockingQueue<T> extends ReferencedQueue<T> implements BlockingQueue<T> {
     private final BlockingQueue<Reference<T>> queue;
-    private final Ref type;
     
     ReferenceBlockingQueue(BlockingQueue<Reference<T>> queue, Ref type){
         super(queue, type);
         this.queue = queue;
-        this.type = type;
     }
 
     public void put(T e) throws InterruptedException {
@@ -54,12 +50,16 @@ class ReferenceBlockingQueue<T> extends 
 
     public T take() throws InterruptedException {
         processQueue();
-        return queue.take().get();
+        Reference<T> t = queue.take();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T poll(long timeout, TimeUnit unit) throws InterruptedException {
         processQueue();
-        return queue.poll(timeout, unit).get();
+        Reference<T> t = queue.poll(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public int remainingCapacity() {
@@ -69,24 +69,20 @@ class ReferenceBlockingQueue<T> extends 
 
     public int drainTo(Collection<? super T> c) {
         processQueue();
-        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(queue.size());
-        int count = queue.drainTo(drain);
-        Iterator<Reference<T>> i = drain.iterator();
-        while (i.hasNext()){
-            c.add(i.next().get());
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Reference<T>> dr = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+        return queue.drainTo(dr);   
         }
-        return count;    
-    }
 
     public int drainTo(Collection<? super T> c, int maxElements) {
         processQueue();
-        Collection<Reference<T>> drain = new ArrayList<Reference<T>>(maxElements);
-        int count = queue.drainTo(drain, maxElements);
-        Iterator<Reference<T>> i = drain.iterator();
-        while (i.hasNext()){
-            c.add(i.next().get());
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Reference<T>> drain = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+        return queue.drainTo(drain, maxElements);
         }
-        return count;
-    }
     
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java Mon Oct  3 08:24:23 2011
@@ -20,7 +20,7 @@ package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
-import java.util.ArrayList;
+import java.util.AbstractCollection;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -49,51 +49,32 @@ import java.util.Set;
  * @see ConcurrentCollections#multiReadCollection(java.util.Collection) 
  * @author Peter Firmstone.
  */
-class ReferenceCollection<T> implements Collection<T>{
-    private Collection<Reference<T>> col;
-    private Ref type;
-    private ReferenceQueue<T> queue;
+class ReferenceCollection<T> extends AbstractCollection<T> implements Collection<T>{
+    private final Collection<Reference<T>> col;
+    private final ReferenceQueuingFactory<T, Reference<T>> rqf;
     
     ReferenceCollection(Collection<Reference<T>> col, Ref type){
-        this.col = col;
-        this.type = type;
-        queue = new ReferenceQueue<T>(); 
+        this(col, new ReferenceProcessor<T>(col, type, type == Ref.STRONG ? null : new ReferenceQueue<T>()));
     }
     
-    ReferenceCollection(Collection<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
+    ReferenceCollection(Collection<Reference<T>> col, 
+            ReferenceQueuingFactory<T, Reference<T>> rqf){
         this.col = col;
-        this.type = type;
-        this.queue = queue;
+        this.rqf = rqf;
     }
     
     void processQueue(){
-        Object t = null;
-        while ( (t = queue.poll()) != null){
-            col.remove(t);
+        rqf.processQueue();
         }
-    }
-    
-    ReferenceQueue<T> getQueue(){
-        return queue;
-    }
     
-    Ref getType(){
-        return type;
+    ReferenceQueuingFactory<T, Reference<T>> getRQF(){
+        return rqf;
     }
     
     Reference<T> wrapObj(T t, boolean enqueue){
-        return ReferenceFactory.create(t, enqueue == true ? queue: null , type);
+        return rqf.referenced(t, enqueue);
     }
     
-    Collection<Reference<T>> wrapColl( Collection<? extends T> c, boolean enqueue){
-        Collection<Reference<T>> refc = new ArrayList<Reference<T>>(c.size());
-        Iterator<? extends T> i = c.iterator();
-        while (i.hasNext()){
-            refc.add(wrapObj(i.next(), false));
-        }
-        return refc;
-    }
-
     public int size() {
         processQueue();
         return col.size();
@@ -122,28 +103,6 @@ class ReferenceCollection<T> implements 
         return new ReferenceIterator<T>(col.iterator());
     }
 
-    public Object[] toArray() {
-        processQueue();
-        int l = col.size();
-        Object[] array = col.toArray(new Object[l]);
-        for (int i = 0; i < l; i++){
-            array[i] = ((Reference)array[i]).get();
-        }
-        return array;
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T> T[] toArray(T[] a) {
-        processQueue();
-        int l = col.size();
-        Reference<T>[] result = col.toArray( new Reference[col.size()]);
-        if ( a.length < result.length) a = (T[]) new Object[result.length];
-        for (int i = 0; i < l; i++){
-            a[i] =  (T) result[i].get();
-        }
-        return a;
-    }
-
     public boolean add(T e) {
         processQueue();
         return col.add(wrapObj(e, true));
@@ -154,24 +113,18 @@ class ReferenceCollection<T> implements 
         return col.remove(wrapObj((T) o, false));
     }
 
+ 
+    @SuppressWarnings("unchecked")
     public boolean containsAll(Collection<?> c) {
         processQueue();
-        return col.containsAll(wrapColl((Collection<T>) c, false));
+        return col.containsAll(new CollectionWrapper<T>((Collection<T>) c, getRQF(), false));
     }
 
+    
+    @SuppressWarnings("unchecked")
     public boolean addAll(Collection<? extends T> c) {
         processQueue();
-        return col.addAll(wrapColl((Collection<T>) c, true));
-    }
-
-    public boolean removeAll(Collection<?> c) {
-        processQueue();
-        return col.removeAll(wrapColl((Collection<T>) c, false));
-    }
-
-    public boolean retainAll(Collection<?> c) {
-        processQueue();
-        return col.retainAll(wrapColl((Collection<T>) c, false));
+        return col.addAll(new CollectionWrapper<T>((Collection<T>) c, getRQF(), true));
     }
 
     public void clear() {
@@ -190,10 +143,7 @@ class ReferenceCollection<T> implements 
         if ( col instanceof List || col instanceof Set ){
             return col.hashCode();
         }
-        int hash = 7;
-        hash = 67 * hash + (this.col != null ? this.col.hashCode() : 0);
-        hash = 67 * hash + (this.type != null ? this.type.hashCode() : 0);
-        return hash;
+        return System.identityHashCode(this);
     }
     
     /**
@@ -216,25 +166,4 @@ class ReferenceCollection<T> implements 
         }
         return false;
     }
-    
-    
-//    @Override
-//    public int hashCode() {
-//        int hash = 7;
-//        hash = 73 * hash + (this.col != null ? this.col.hashCode() : 0);
-//        hash = 73 * hash + (this.type != null ? this.type.hashCode() : 0);
-//        hash = 73 * hash + (this.getClass().hashCode());
-//        return hash;
-//    }
-//    
-//    @Override
-//    public boolean equals(Object o){
-//        if ( o == this ) return true;
-//        if ( hashCode() != o.hashCode()) return false;
-//        if (!( o instanceof Collection)) return false;
-//        ReferenceCollection that = (ReferenceCollection) o;
-//        if ( type.equals(that.type) && col.equals(that.col)) return true;
-//        return false;
-//    }
-    
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java Mon Oct  3 08:24:23 2011
@@ -29,11 +29,12 @@ import java.util.Comparator;
 class ReferenceComparator<T> implements Comparator<Reference<T>> {
     private final Comparator<? super T> comparator;
     ReferenceComparator(Comparator<? super T> comparator){
+        if ( comparator == null ) throw new IllegalArgumentException("Null value prohibited");
         this.comparator = comparator;
     }
 
     public int compare(Reference<T> o1, Reference<T> o2) {
-        return comparator.compare(o1.get(), o2.get());
+        return comparator.compare( o1 != null ? o1.get() : null, o2 != null ? o2.get() : null);
     }
 
     @Override

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java Mon Oct  3 08:24:23 2011
@@ -20,7 +20,6 @@ package org.apache.river.impl.util;
 
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -49,7 +48,6 @@ import java.util.concurrent.ConcurrentMa
  */
 class ReferenceConcurrentMap<K, V> extends ReferenceMap<K, V> implements ConcurrentMap<K, V> {
 
-    
     // ConcurrentMap must be protected from null values?  It changes it's behaviour, is that a problem?
     private final ConcurrentMap<Reference<K>, Reference<V>> map;
     
@@ -58,50 +56,52 @@ class ReferenceConcurrentMap<K, V> exten
         this.map = map;
     }
     
-    ReferenceConcurrentMap(ConcurrentMap<Reference<K>,Reference<V>> map,
-            Ref key, Ref val, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
-        super (map, key, val, keyQueue, valQueue);
+    ReferenceConcurrentMap(ConcurrentMap<Reference<K>, Reference<V>> map,
+            ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf){
+        super(map, krqf, vrqf);
         this.map = map;
     }
     
     public V putIfAbsent(K key, V value) {
         processQueue();  //may be a slight delay before atomic putIfAbsent
-        if (key == null || value == null) throw new NullPointerException("null key or value not allowed");
         Reference<K> k = wrapKey(key, true);
         Reference<V> v = wrapVal(value, true);
         Reference<V> val = map.putIfAbsent(k, v);
-        if ( val != null ) return val.get();
+        while ( val != null ) {
+            V existed = val.get();
+            // We hold a strong reference to value, so 
+            if ( existed == null ){
+                // stale reference must be replaced, it has been garbage collect but hasn't 
+                // been removed, we must treat it like the entry doesn't exist.
+                if ( map.replace(k, val, v)){
+                    // replace successful
+                    return null; // Because officially there was no record.
+                } else {
+                    // Another thread may have replaced it.
+                    val = map.putIfAbsent(k, v);
+                }
+            } else {
+                return existed;
+            }
+        }
         return null;
     }
 
+    @SuppressWarnings("unchecked")
     public boolean remove(Object key, Object value) {
         processQueue();
-        if (key == null || value == null) return false;
-        @SuppressWarnings("unchecked")
-        Reference<K> k = wrapKey( (K) key, false);
-        @SuppressWarnings("unchecked")
-        Reference<V> v = wrapVal( (V) value, false);
-        return map.remove(k, v);
+        return map.remove(wrapKey((K) key, false), wrapVal((V) value, false));
     }
 
     public boolean replace(K key, V oldValue, V newValue) {
         processQueue();
-        if (key == null || oldValue == null || newValue == null) return false;
-        Reference<K> k = wrapKey(key, true);
-        Reference<V> vOld = wrapVal(oldValue, false);
-        Reference<V> vNew = wrapVal(newValue, true);
-        return map.replace(k, vOld, vNew);
+        return map.replace(wrapKey(key, true), wrapVal(oldValue, false), wrapVal(newValue, false));
     }
 
     public V replace(K key, V value) {
         processQueue();
-        if (key == null || value == null) throw new NullPointerException("null key or value not allowed");
-        Reference<K> k = wrapKey(key, true);
-        Reference<V> v = wrapVal(value, true);
-        Reference<V> val = map.replace(k, v);
+        Reference<V> val = map.replace(wrapKey(key, true), wrapVal(value, true));
         if ( val != null ) return val.get();
         return null;
     }
-    
-    
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java Mon Oct  3 08:24:23 2011
@@ -19,7 +19,6 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.Comparator;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
@@ -33,24 +32,19 @@ import java.util.concurrent.ConcurrentNa
  */
 class ReferenceConcurrentNavigableMap<K,V> 
 extends ReferenceConcurrentMap<K,V> implements ConcurrentNavigableMap<K,V>{
-    private ConcurrentNavigableMap<Reference<K>,Reference<V>> map;
-    private Ref keyRef;
-    private Ref valRef;
+    private final ConcurrentNavigableMap<Reference<K>,Reference<V>> map;
     
     ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
         super(map, keyRef, valRef);
         this.map = map;
-        this.keyRef = keyRef;
-        this.valRef = valRef;
     }
 
     ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Reference<K>, Reference<V>> map,
-            Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
-        super(map, keyRef, valRef, keyQueue, valQueue);
+            ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf){
+        super(map, krqf, vrqf);
         this.map = map;
-        this.keyRef = keyRef;
-        this.valRef = valRef;
     }
+    
     public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
@@ -60,208 +54,172 @@ extends ReferenceConcurrentMap<K,V> impl
                 wrapKey(toKey, false),
                 toInclusive
             ), 
-            keyRef, 
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
             map.headMap(wrapKey(toKey, false), inclusive),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
             map.tailMap(wrapKey(fromKey, false), inclusive),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
             map.subMap(wrapKey(fromKey, false), wrapKey(toKey, false)),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public ConcurrentNavigableMap<K, V> headMap(K toKey) {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
             map.headMap(wrapKey(toKey, false)),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
             map.tailMap(wrapKey(fromKey, false)),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public ConcurrentNavigableMap<K, V> descendingMap() {
         processQueue();
         return new ReferenceConcurrentNavigableMap<K,V>(
             map.descendingMap(),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
-
     public NavigableSet<K> navigableKeySet() {
         processQueue();
-        return new ReferenceNavigableSet<K>(map.navigableKeySet(), keyRef, getKeyQueue());
+        return new ReferenceNavigableSet<K>(map.navigableKeySet(), getKeyRQF());
     }
 
-
     public NavigableSet<K> keySet() {
         processQueue();
-        return new ReferenceNavigableSet<K>(map.keySet(), keyRef, getKeyQueue());
+        return new ReferenceNavigableSet<K>(map.keySet(), getKeyRQF());
     }
 
-
     public NavigableSet<K> descendingKeySet() {
         processQueue();
-        return new ReferenceNavigableSet<K>(map.descendingKeySet(), keyRef, getKeyQueue());
+        return new ReferenceNavigableSet<K>(map.descendingKeySet(), getKeyRQF());
     }
 
-
     public Entry<K, V> lowerEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.lowerEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public K lowerKey(K key) {
         processQueue();
-        return map.lowerKey(wrapKey(key, false)).get();
+        Reference<K> k = map.lowerKey(wrapKey(key, false));
+        if ( k != null ) return k.get();
+        return null;
     }
 
-
     public Entry<K, V> floorEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.floorEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public K floorKey(K key) {
         processQueue();
-        return map.floorKey(wrapKey(key, false)).get();
+        Reference<K> k = map.floorKey(wrapKey(key, false));
+        if ( k != null ) return k.get();
+        return null;
     }
 
-
     public Entry<K, V> ceilingEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.ceilingEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public K ceilingKey(K key) {
         processQueue();
-        return map.ceilingKey(wrapKey(key, false)).get();
+        Reference<K> k = map.ceilingKey(wrapKey(key, false));
+        if ( k != null ) return k.get();
+        return null;
     }
 
-
     public Entry<K, V> higherEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.higherEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public K higherKey(K key) {
         processQueue();
-        return map.higherKey(wrapKey(key, false)).get();
+        Reference<K> k = map.higherKey(wrapKey(key, false));
+        if ( k != null ) return k.get();
+        return null;
     }
 
-
     public Entry<K, V> firstEntry() {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.firstEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public Entry<K, V> lastEntry() {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.lastEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public Entry<K, V> pollFirstEntry() {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.pollFirstEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-
     public Entry<K, V> pollLastEntry() {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.pollLastEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
-    
     @SuppressWarnings("unchecked")
     public Comparator<? super K> comparator() {
         processQueue();
@@ -272,17 +230,17 @@ extends ReferenceConcurrentMap<K,V> impl
         return null;
     }
 
-
     public K firstKey() {
         processQueue();
-        return map.firstKey().get();
+        Reference<K> k = map.firstKey();
+        if ( k != null ) return k.get();
+        return null;
     }
 
-
     public K lastKey() {
         processQueue();
-        return map.lastKey().get();
+        Reference<K> k = map.lastKey();
+        if ( k != null ) return k.get();
+        return null;
     }
-
-
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java Mon Oct  3 08:24:23 2011
@@ -58,42 +58,58 @@ class ReferenceDeque<T> extends Referenc
 
     public T removeFirst() {
         processQueue();
-        return deque.removeFirst().get();
+        Reference<T> t = deque.removeFirst();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T removeLast() {
         processQueue();
-        return deque.removeLast().get();
+        Reference<T> t = deque.removeLast();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T pollFirst() {
         processQueue();
-        return deque.pollFirst().get();
+        Reference<T> t = deque.pollFirst();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T pollLast() {
         processQueue();
-        return deque.pollLast().get();
+        Reference<T> t = deque.pollLast();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T getFirst() {
         processQueue();
-        return deque.getFirst().get();
+        Reference<T> t = deque.getFirst();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T getLast() {
         processQueue();
-        return deque.getLast().get();
+        Reference<T> t = deque.getLast();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T peekFirst() {
         processQueue();
-        return deque.peekFirst().get();
+        Reference<T> t = deque.peekFirst();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T peekLast() {
         processQueue();
-        return deque.peekLast().get();
+        Reference<T> t = deque.peekLast();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public boolean removeFirstOccurrence(Object o) {
@@ -118,7 +134,9 @@ class ReferenceDeque<T> extends Referenc
 
     public T pop() {
         processQueue();
-        return deque.pop().get();
+        Reference<T> t = deque.pop();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public Iterator<T> descendingIterator() {

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java Mon Oct  3 08:24:23 2011
@@ -19,7 +19,6 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -28,27 +27,30 @@ import java.util.Map.Entry;
  * @author Peter Firmstone.
  */
 class ReferenceEntryFacade<K, V> implements Map.Entry<K, V> {
-    private final Map.Entry<Reference<K>, Reference<V>> entry;
-    private final ReferenceQueue<V> queue;
-    private final Ref valRef;
+    private final Entry<Reference<K>, Reference<V>> entry;
+    private final ReferenceQueuingFactory<V, Reference<V>> rqf;
 
-    ReferenceEntryFacade(Map.Entry<Reference<K>, Reference<V>> entry,
-                                        Ref valRef, ReferenceQueue<V> queue) {
+    ReferenceEntryFacade(Entry<Reference<K>, Reference<V>> entry, ReferenceQueuingFactory<V, Reference<V>> rqf){
         this.entry = entry;
-        this.queue = queue;
-        this.valRef = valRef;
+        this.rqf = rqf;
     }
     
     public K getKey() {
-        return entry.getKey().get();
+        Reference<K> k = entry.getKey();
+        if ( k != null ) return k.get();
+        return null;
     }
 
     public V getValue() {
-        return entry.getValue().get();
+        Reference<V> v = entry.getValue();
+        if ( v != null ) return v.get();
+        return null;
     }
 
     public V setValue(V value) {
-        return entry.setValue(wrapVal(value, true)).get();
+        Reference<V> v = entry.setValue(wrapVal(value, true));
+        if ( v != null ) return v.get();
+        return null;
     }
 
     /**
@@ -58,7 +60,7 @@ class ReferenceEntryFacade<K, V> impleme
      */
     @Override
     public int hashCode() {
-        return (getKey()==null   ? 0 : getKey().hashCode()) ^
+        return (getKey()==null ? 0 : getKey().hashCode()) ^
              (getValue()==null ? 0 : getValue().hashCode());
     }
 
@@ -78,7 +80,7 @@ class ReferenceEntryFacade<K, V> impleme
     }
     
     private Reference<V> wrapVal(V val, boolean enque) {
-        return ReferenceFactory.create(val, enque == true ? queue : null, valRef);
+        return rqf.referenced(val, enque);
 }
 
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java Mon Oct  3 08:24:23 2011
@@ -29,6 +29,7 @@ class ReferenceIterator<T> implements It
     private final Iterator<Reference<T>> iterator;
 
     ReferenceIterator(Iterator<Reference<T>> iterator) {
+        if ( iterator == null ) throw new IllegalArgumentException("iterator cannot be null");
         this.iterator = iterator;
     }
 
@@ -37,7 +38,9 @@ class ReferenceIterator<T> implements It
     }
 
     public T next() {
-        return iterator.next().get();
+        Reference<T> t = iterator.next();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public void remove() {

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java Mon Oct  3 08:24:23 2011
@@ -19,7 +19,6 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -52,8 +51,8 @@ class ReferenceList<T> extends Reference
         this.list = list;
     }
     
-    private ReferenceList(List<Reference<T>> list, Ref type, ReferenceQueue<T> queue){
-        super(list, type, queue);
+    ReferenceList(List<Reference<T>> list, ReferenceQueuingFactory<T, Reference<T>> rqf){
+        super(list, rqf);
         this.list = list;
     }
 
@@ -99,19 +98,24 @@ class ReferenceList<T> extends Reference
         return hashCode;
     }
 
+    @SuppressWarnings("unchecked")
     public boolean addAll(int index, Collection<? extends T> c) {
         processQueue();
-        return list.addAll(index, wrapColl(c, true));
+        return list.addAll(index, new CollectionWrapper<T>((Collection<T>) c, getRQF(), true));
     }
 
     public T get(int index) {
         processQueue();
-        return list.get(index).get();
+        Reference<T> r = list.get(index);
+        if (r != null) return r.get();
+        return null;
     }
 
     public T set(int index, T element) {
         processQueue();
-        return list.set(index, wrapObj(element, true)).get();
+        Reference<T> r = list.set(index, wrapObj(element, true));
+        if (r != null) return r.get();
+        return null;
     }
 
     public void add(int index, T element) {
@@ -121,7 +125,9 @@ class ReferenceList<T> extends Reference
 
     public T remove(int index) {
         processQueue();
-        return list.remove(index).get();
+        Reference<T> r = list.remove(index);
+        if (r != null) return r.get();
+        return null;
     }
 
     @SuppressWarnings("unchecked")
@@ -138,25 +144,28 @@ class ReferenceList<T> extends Reference
 
     public ListIterator<T> listIterator() {
         processQueue();
-        return new ListIteratorWrap<T>(list.listIterator(), this);
+        return new ReferenceListIterator<T>(list.listIterator(), getRQF());
     }
 
     public ListIterator<T> listIterator(int index) {
         processQueue();
-        return new ListIteratorWrap<T>(list.listIterator(index), this);
+        return new ReferenceListIterator<T>(list.listIterator(index), getRQF());
     }
 
     public List<T> subList(int fromIndex, int toIndex) {
         processQueue();
-        List<T> sub = new ReferenceList<T>(list.subList(fromIndex, toIndex), getType(), getQueue());
+        List<T> sub = new ReferenceList<T>(list.subList(fromIndex, toIndex), getRQF());
         return sub;
     }
     
-    private  class ListIteratorWrap<T> implements ListIterator<T>{
+    private  class ReferenceListIterator<T> implements ListIterator<T>{
         ListIterator<Reference<T>> iterator;
-        ReferenceCollection<T> col;
-        private ListIteratorWrap(ListIterator<Reference<T>> iterator, ReferenceCollection<T> c){
+        ReferenceQueuingFactory<T, Reference<T>> rqf;
+        private ReferenceListIterator(ListIterator<Reference<T>> iterator, ReferenceQueuingFactory<T, Reference<T>> rqf ){
+            if ( iterator == null || rqf == null ) throw 
+            new NullPointerException("Null iterator or reference queuing factory not allowed");
             this.iterator = iterator;
+            this.rqf = rqf;
         }
 
         public boolean hasNext() {
@@ -164,7 +173,9 @@ class ReferenceList<T> extends Reference
         }
 
         public T next() {
-            return iterator.next().get();
+            Reference<T> t = iterator.next();
+            if ( t != null ) return t.get();
+            return null;
         }
 
         public boolean hasPrevious() {
@@ -172,7 +183,9 @@ class ReferenceList<T> extends Reference
         }
 
         public T previous() {
-            return iterator.previous().get();
+            Reference<T> t = iterator.previous();
+            if ( t != null ) return t.get();
+            return null;
         }
 
         public int nextIndex() {
@@ -188,11 +201,11 @@ class ReferenceList<T> extends Reference
         }
 
         public void set(T e) {
-            iterator.set(col.wrapObj(e, true));
+            iterator.set(rqf.referenced(e, true));
         }
 
         public void add(T e) {
-            iterator.add(col.wrapObj( e, true));
+            iterator.add(rqf.referenced( e, true));
         }       
     }
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java Mon Oct  3 08:24:23 2011
@@ -22,8 +22,6 @@ import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
 import java.util.AbstractMap;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -54,47 +52,44 @@ import java.util.Set;
  * @param <V> 
  * @author Peter Firmstone.
  */
-class ReferenceMap<K, V> extends AbstractMap<K, V> implements Map<K, V>, ReferenceQueueProcessor {
+class ReferenceMap<K, V> extends AbstractMap<K, V> implements Map<K, V> {
 
-    // ConcurrentHashMap must be protected from null values;
-    private final Ref keyRef;
-    private final Ref valRef;
     private final Map<Reference<K>, Reference<V>> map;
-    private final ReferenceQueue<K> keyQueue;
-    private final ReferenceQueue<V> valQueue;
+    // ReferenceQueuingFactory's handle ReferenceQueue locking policy.
+    private final ReferenceQueuingFactory<K, Reference<K>> krqf;
+    private final ReferenceQueuingFactory<V, Reference<V>> vrqf;
+    // Views of values and keys, reduces object creation.
+    private final Collection<V> values;
+    private final Set<K> keys;
+    private final Set<Entry<K,V>> entrys;
     
     ReferenceMap(Map<Reference<K>,Reference<V>> map, Ref key, Ref val){
-        super();
-        if (map == null || key == null || val == null ) throw new IllegalArgumentException("Null not allowed");
-        this.map = map;
-        keyQueue = new ReferenceQueue<K>();
-        valQueue = new ReferenceQueue<V>();
-        keyRef = key;
-        valRef = val;
-    }
-    ReferenceMap(Map<Reference<K>,Reference<V>> map, Ref key, Ref val, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
-        super();
-        if (map == null || key == null || val == null ) throw new IllegalArgumentException("Null not allowed");
+        this(map, 
+                new ReferenceProcessor<K>(map.keySet(), key, new ReferenceQueue<K>()),
+                new ReferenceProcessor<V>(map.values(), val, new ReferenceQueue<V>())
+                );
+    }
+    
+    ReferenceMap(Map<Reference<K>, Reference<V>> map, ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf){
         this.map = map;
-        this.keyQueue = keyQueue;
-        this.valQueue = valQueue;
-        keyRef = key;
-        valRef = val;
+        this.krqf = krqf;
+        this.vrqf = vrqf;
+        values = new ReferenceCollection<V>(this.map.values(), vrqf);
+        keys = new ReferenceSet<K>(this.map.keySet(), krqf);
+        // We let this escape during construction, but it's package private only
+        // and doesn't escape the package.
+        entrys = new EntrySetFacade<Entry<K,V>, Entry<Reference<K>,Reference<V>>>(
+                map.entrySet(), 
+                new EntryFacadeConverter<K,V>(krqf, vrqf)
+                );
     }
-     ReferenceQueue<V> getValQueue(){
-         return valQueue;
-     }
      
-     ReferenceQueue<K> getKeyQueue(){
-         return keyQueue;
+    ReferenceQueuingFactory<K, Reference<K>> getKeyRQF(){
+        return krqf;
      }
      
-     Ref getKeyRef(){
-         return keyRef;
-    }
-    
-     Ref getValRef(){
-         return valRef;
+    ReferenceQueuingFactory<V, Reference<V>> getValRQF(){
+        return vrqf;
     }
     
     /**
@@ -108,19 +103,12 @@ class ReferenceMap<K, V> extends Abstrac
     @SuppressWarnings(value = "unchecked")
     public boolean containsKey(Object key) {
         processQueue();
-        if (key == null) {
-            return false;
+        return map.containsKey(wrapKey((K)key, false));
         }
-        Reference<K> kRef = wrapKey((K) key, false);
-        return map.containsKey(kRef);
-    }
 
     @SuppressWarnings("unchecked")
     public boolean containsValue(Object value) {
         processQueue();
-        if (value == null) {
-            return false;
-        }
         return map.containsValue(wrapVal((V) value, false));
     }
 
@@ -134,15 +122,10 @@ class ReferenceMap<K, V> extends Abstrac
      * mapping from the map, via the Iterator.remove, Set.remove, removeAll, 
      * retainAll and clear operations. 
      * 
-     * This method does not support the add or addAll operations.
-     * 
      * @return
      */
     public Set<Entry<K,V>> entrySet() {
-        return new EntrySetFacade<Entry<K,V>, Entry<Reference<K>,Reference<V>>>(
-                map.entrySet(), 
-                new EntryFacadeConverter<K,V>(this)
-                );
+        return entrys;
     }
 
     /**
@@ -150,14 +133,9 @@ class ReferenceMap<K, V> extends Abstrac
      */
     public V get(Object key) {
         processQueue();
-        if (key == null) {
-            return null;
-        }
         @SuppressWarnings(value = "unchecked")
         Reference<V> refVal = map.get(wrapKey((K) key, false));
-        if (refVal != null) {
-            return refVal.get();
-        }
+        if (refVal != null) return refVal.get();
         return null;
     }
 
@@ -176,19 +154,18 @@ class ReferenceMap<K, V> extends Abstrac
      */
     public Set<K> keySet() {
         processQueue();
-        return new ReferenceSet<K>(map.keySet(), keyRef, keyQueue);
+        return keys;
     }
 
     public void processQueue() {
-        Reference r = null;
-        while ((r = keyQueue.poll()) != null) {
-            map.remove(r);
-        }
-        Collection<Reference<V>> values = map.values();
-        while ((r = valQueue.poll()) != null) {
-            values.remove(r); //Removes mapping.
+        // If someone else is cleaning out the trash, don't bother waiting,
+        // the underlying Map is responsible for it's own synchronization.
+        // Null values or keys may be returned as a result.
+        // Or a ConcurrentMap that contains a value may no longer contain
+        // it after checking.
+        krqf.processQueue();
+        vrqf.processQueue();
         }
-    }
 
     /**
      * Associates value with given key, returning value previously associated
@@ -199,31 +176,10 @@ class ReferenceMap<K, V> extends Abstrac
      */
     public V put(K key, V value) {
         processQueue();
-        if (key == null) {
+        Reference<V> val = map.put(wrapKey(key, false),wrapVal(value, false));
+        if (val != null) return val.get();
             return null;
         }
-        Reference<V> val = map.put(wrapKey(key, true), wrapVal(value, true));
-        if (val != null) {
-            return val.get();
-        }
-        return null;
-    }
-
-    /**
-     * 
-     * @param m
-     */
-    public void putAll(Map<? extends K, ? extends V> m) {
-        if (m.isEmpty()) return;
-        Map<Reference<K>, Reference<V>> refMap 
-            = new HashMap<Reference<K>, Reference<V>>(m.size());
-        Iterator<? extends Map.Entry<? extends K, ? extends V>> i = m.entrySet().iterator();
-        while (i.hasNext()){
-            Map.Entry<? extends K, ? extends V> ent = i.next();
-            refMap.put(wrapKey(ent.getKey(), true),wrapVal(ent.getValue(), true));
-        }
-        map.putAll(refMap);
-    }
 
     /**
      * Removes association for given key, returning value previously associated
@@ -231,14 +187,9 @@ class ReferenceMap<K, V> extends Abstrac
      */
     public V remove(Object key) {
         processQueue();
-        if (key == null) {
-            return null;
-        }
         @SuppressWarnings(value = "unchecked")
         Reference<V> val = map.remove(wrapKey((K) key, false));
-        if (val != null) {
-            return val.get();
-        }
+        if (val != null) return val.get();
         return null;
     }
 
@@ -252,15 +203,15 @@ class ReferenceMap<K, V> extends Abstrac
      */
     public Collection<V> values() {
         processQueue();
-        return new ReferenceCollection<V>(map.values(), valRef, valQueue);
+        return values;
     }
 
     Reference<V> wrapVal(V val, boolean enque) {
-        return ReferenceFactory.create(val, enque == true ? valQueue : null, valRef);
+        return vrqf.referenced(val, enque);
     }
 
     Reference<K> wrapKey(K key, boolean enque) {
-        return ReferenceFactory.create(key, enque == true ? keyQueue : null, keyRef);
+        return krqf.referenced(key, enque);
     }
     
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java Mon Oct  3 08:24:23 2011
@@ -19,7 +19,6 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
@@ -30,86 +29,84 @@ import java.util.NavigableSet;
  */
 class ReferenceNavigableMap<K,V> extends ReferenceSortedMap<K,V> implements NavigableMap<K,V> {
     private final NavigableMap<Reference<K>, Reference<V>> map;
-    private final Ref keyRef;
-    private final Ref valRef;
     
     ReferenceNavigableMap(NavigableMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
         super(map, keyRef, valRef);
         this.map = map;
-        this.keyRef = keyRef;
-        this.valRef = valRef;
     }
     
-    ReferenceNavigableMap( NavigableMap<Reference<K>, Reference<V>> map,
-            Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
-        super(map, keyRef, valRef, keyQueue, valQueue);
+    ReferenceNavigableMap(NavigableMap<Reference<K>, Reference<V>> map,
+            ReferenceQueuingFactory<K, Reference<K>> krqf,
+            ReferenceQueuingFactory<V, Reference<V>> vrqf){
+        super(map, krqf, vrqf);
         this.map = map;
-        this.keyRef = keyRef;
-        this.valRef = valRef;
     }
 
     public Entry<K, V> lowerEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.lowerEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
     public K lowerKey(K key) {
         processQueue();
-        return map.lowerKey(wrapKey(key, false)).get();
+        Reference<K> k = map.lowerKey(wrapKey(key, false));
+        if (k != null) return k.get();
+        return null;
     }
 
     public Entry<K, V> floorEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.floorEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
     public K floorKey(K key) {
         processQueue();
-        return map.floorKey(wrapKey(key, false)).get();
+        Reference<K> k = map.floorKey(wrapKey(key, false));
+        if (k != null) return k.get();
+        return null;
     }
 
     public Entry<K, V> ceilingEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.ceilingEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
     public K ceilingKey(K key) {
         processQueue();
-        return map.ceilingKey(wrapKey(key, false)).get();
+        Reference<K> k = map.ceilingKey(wrapKey(key, false));
+        if (k != null) return k.get();
+        return null;
     }
 
     public Entry<K, V> higherEntry(K key) {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.higherEntry(wrapKey(key, false)),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
     public K higherKey(K key) {
         processQueue();
-        return map.higherKey(wrapKey(key, false)).get();
+        Reference<K> k = map.higherKey(wrapKey(key, false));
+        if (k != null) return k.get();
+        return null;
     }
 
     public Entry<K, V> firstEntry() {
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.firstEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
@@ -117,8 +114,7 @@ class ReferenceNavigableMap<K,V> extends
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.lastEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
@@ -126,8 +122,7 @@ class ReferenceNavigableMap<K,V> extends
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.pollFirstEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
@@ -135,8 +130,7 @@ class ReferenceNavigableMap<K,V> extends
         processQueue();
         return new ReferenceEntryFacade<K,V>(
             map.pollLastEntry(),
-            valRef, 
-            getValQueue()
+            getValRQF()
         );
     }
 
@@ -144,21 +138,19 @@ class ReferenceNavigableMap<K,V> extends
         processQueue();
         return new ReferenceNavigableMap<K,V>(
                 map.descendingMap(),
-                keyRef,
-                valRef,
-                getKeyQueue(),
-                getValQueue()
+                getKeyRQF(),
+                getValRQF()
                 );
     }
 
     public NavigableSet<K> navigableKeySet() {
         processQueue();
-        return new ReferenceNavigableSet<K>(map.navigableKeySet(), keyRef, getKeyQueue());
+        return new ReferenceNavigableSet<K>(map.navigableKeySet(), getKeyRQF());
     }
 
     public NavigableSet<K> descendingKeySet() {
         processQueue();
-        return new ReferenceNavigableSet<K>(map.descendingKeySet(), keyRef, getKeyQueue());
+        return new ReferenceNavigableSet<K>(map.descendingKeySet(), getKeyRQF());
     }
 
     public NavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
@@ -170,10 +162,8 @@ class ReferenceNavigableMap<K,V> extends
                 wrapKey(toKey, false), 
                 toInclusive
             ),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
 
     }
@@ -182,10 +172,8 @@ class ReferenceNavigableMap<K,V> extends
         processQueue();
         return new ReferenceNavigableMap<K,V>(
             map.headMap(wrapKey(toKey, false),inclusive),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 
@@ -193,10 +181,8 @@ class ReferenceNavigableMap<K,V> extends
         processQueue();
         return new ReferenceNavigableMap<K,V>(
             map.tailMap(wrapKey(fromKey, false),inclusive),
-            keyRef,
-            valRef,
-            getKeyQueue(),
-            getValQueue()
+            getKeyRQF(),
+            getValRQF()
         );
     }
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java Mon Oct  3 08:24:23 2011
@@ -31,53 +31,62 @@ import java.util.NavigableSet;
 public class ReferenceNavigableSet<T> 
     extends ReferenceSortedSet<T> implements NavigableSet<T> {
     private final NavigableSet<Reference<T>> set;
-    private final Ref type;
     
     public ReferenceNavigableSet(NavigableSet<Reference<T>> set, Ref type){
         super(set, type);
         this.set = set;
-        this.type = type;
     }
     
-    ReferenceNavigableSet(NavigableSet<Reference<T>> set, Ref type, ReferenceQueue<T> queue){
-        super(set, type, queue);
+    ReferenceNavigableSet(NavigableSet<Reference<T>> set, ReferenceQueuingFactory<T, Reference<T>> rqf){
+        super(set, rqf);
         this.set = set;
-        this.type = type;
     }
 
     public T lower(T e) {
         processQueue();
-        return set.lower(wrapObj(e, false)).get();
+        Reference<T> t = set.lower(wrapObj(e, false));
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T floor(T e) {
         processQueue();
-        return set.floor(wrapObj(e, false)).get();
+        Reference<T> t = set.floor(wrapObj(e, false));
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T ceiling(T e) {
         processQueue();
-        return set.ceiling(wrapObj(e, false)).get();
+        Reference<T> t = set.ceiling(wrapObj(e, false));
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T higher(T e) {
         processQueue();
-        return set.higher(wrapObj(e, false)).get();
+        Reference<T> t = set.higher(wrapObj(e, false));
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T pollFirst() {
         processQueue();
-        return set.pollFirst().get();
+        Reference<T> t = set.pollFirst();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T pollLast() {
         processQueue();
-        return set.pollLast().get();
+        Reference<T> t = set.pollLast();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public NavigableSet<T> descendingSet() {
         processQueue();
-        return new ReferenceNavigableSet<T>(set.descendingSet(), type, getQueue());
+        return new ReferenceNavigableSet<T>(set.descendingSet(), getRQF());
     }
 
     public Iterator<T> descendingIterator() {
@@ -93,26 +102,19 @@ public class ReferenceNavigableSet<T> 
                 fromInclusive, 
                 wrapObj(toElement, false), 
                 toInclusive
-            ), 
-            type, 
-            getQueue()
-        );
+            ), getRQF());
     }
 
     public NavigableSet<T> headSet(T toElement, boolean inclusive) {
         processQueue();
         return new ReferenceNavigableSet<T>(
-                set.headSet(wrapObj(toElement, false), inclusive), 
-                type, getQueue()
-                );
+                set.headSet(wrapObj(toElement, false), inclusive), getRQF());
     }
 
     public NavigableSet<T> tailSet(T fromElement, boolean inclusive) {
         processQueue();
         return new ReferenceNavigableSet<T>(
-                set.tailSet(wrapObj(fromElement, false), inclusive), 
-                type, getQueue()
-                );
+                set.tailSet(wrapObj(fromElement, false), inclusive), getRQF());
     }
     
 }

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java?rev=1178329&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java Mon Oct  3 08:24:23 2011
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collection;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ *
+ *
+ * @param <T> 
+ * @author peter
+ */
+class ReferenceProcessor<T> implements ReferenceQueuingFactory<T, Reference<T>> {
+    
+    private final Collection<Reference<T>> col;
+    private final ReferenceQueue<T> queue;
+    private final Ref type;
+    private final Lock queueLock;
+    
+    ReferenceProcessor(Collection<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
+        if (col == null || type == null ) throw new NullPointerException("collection or reference type cannot be null");
+        this.col = col;
+        this.type = type;
+        this.queue = type == Ref.STRONG ? null : queue;
+        queueLock = new ReentrantLock();
+    }
+
+    @Override
+    public T pseudoReferent(Reference<T> u) {
+        throw new UnsupportedOperationException("Not supported.");
+    }
+
+    @Override
+    public Reference<T> referenced(T w, boolean enque) {
+        if (w == null) return null;
+        return ReferenceFactory.create(w, enque == true ? queue : null, type);
+    }
+
+    @Override
+    public void processQueue() {
+        if (queue == null) return;
+        Object t = null;
+        /*
+         * The reason for using an explicit lock is if another thread is
+         * removing the garbage, we don't want to prevent all other threads
+         * accessing the underlying collection, when it blocks on poll,
+         * this means that some client threads will receive null values 
+         * on occassion, but this is a small price to pay.  
+         * Might have to employ the null object pattern.
+         */
+        if ( queueLock.tryLock()){
+            try {
+                while ( (t = queue.poll()) != null){
+                    col.remove(t);
+                }
+            }finally{
+                queueLock.unlock();
+            }
+        }
+    }
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java Mon Oct  3 08:24:23 2011
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.impl.util;
-
-/**
- *
- * @author peter
- */
-interface ReferenceQueueProcessor {
-
-    void processQueue();
-    
-}

Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java?rev=1178329&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java Mon Oct  3 08:24:23 2011
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+/**
+ * An interface for processing ReferenceQueue's and encapsulating Objects
+ * in references and for making references appear as their referent.
+ * 
+ * @author Peter Firmstone
+ */
+interface ReferenceQueuingFactory<O, R> {
+
+    O pseudoReferent(R u);
+
+    R referenced(O w, boolean enque);
+
+    void processQueue();
+    
+}

Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java Mon Oct  3 08:24:23 2011
@@ -19,8 +19,6 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 
@@ -37,8 +35,8 @@ class ReferenceSet<T> extends ReferenceC
         super(col, type);
     }
     
-    ReferenceSet(Set<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
-        super(col, type, queue);
+    ReferenceSet(Set<Reference<T>> col, ReferenceQueuingFactory<T, Reference<T>> rqf){
+        super(col, rqf);
     }
     
     public boolean equals(Object o) {

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java Mon Oct  3 08:24:23 2011
@@ -19,7 +19,6 @@
 package org.apache.river.impl.util;
 
 import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
 import java.util.Comparator;
 import java.util.SortedMap;
 
@@ -31,22 +30,17 @@ import java.util.SortedMap;
  */
 class ReferenceSortedMap<K,V> extends ReferenceMap<K,V> implements SortedMap<K,V>{
     private SortedMap<Reference<K>, Reference<V>> map;
-    private Ref keyRef;
-    private Ref valRef;
     
     ReferenceSortedMap(SortedMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
         super(map, keyRef, valRef);
         this.map = map;
-        this.keyRef = keyRef;
-        this.valRef = valRef;
     }
     
-    ReferenceSortedMap( SortedMap<Reference<K>, Reference<V>> map,
-            Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
-        super(map, keyRef, valRef, keyQueue, valQueue);
+    ReferenceSortedMap(SortedMap<Reference<K>, Reference<V>> map, 
+            ReferenceQueuingFactory<K, Reference<K>> krqf,
+            ReferenceQueuingFactory<V, Reference<V>> vrqf){
+        super(map, krqf, vrqf);
         this.map = map;
-        this.keyRef = keyRef;
-        this.valRef = valRef;
     }
 
     @SuppressWarnings("unchecked")
@@ -63,10 +57,8 @@ class ReferenceSortedMap<K,V> extends Re
         processQueue();
         return new ReferenceSortedMap<K,V>(
                 map.subMap(wrapKey(fromKey, false), wrapKey(toKey, false)),
-                keyRef,
-                valRef,
-                getKeyQueue(),
-                getValQueue()
+                getKeyRQF(),
+                getValRQF()
                 );
     }
 
@@ -75,10 +67,8 @@ class ReferenceSortedMap<K,V> extends Re
         processQueue();
         return new ReferenceSortedMap<K,V>(
                 map.headMap(wrapKey(toKey, false)),
-                keyRef,
-                valRef,
-                getKeyQueue(),
-                getValQueue()
+                getKeyRQF(),
+                getValRQF()
                 );
     }
 
@@ -87,23 +77,25 @@ class ReferenceSortedMap<K,V> extends Re
         processQueue();
         return new ReferenceSortedMap<K,V>(
                 map.tailMap(wrapKey(fromKey, false)),
-                keyRef,
-                valRef,
-                getKeyQueue(),
-                getValQueue()
+                getKeyRQF(),
+                getValRQF()
                 );
     }
 
 
     public K firstKey() {
         processQueue();
-        return map.firstKey().get();
+        Reference<K> k = map.firstKey();
+        if (k != null) return k.get();
+        return null;
     }
 
 
     public K lastKey() {
         processQueue();
-        return map.lastKey().get();
+        Reference<K> k = map.lastKey();
+        if (k != null) return k.get();
+        return null;
     }
     
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java Mon Oct  3 08:24:23 2011
@@ -35,17 +35,15 @@ import java.util.SortedSet;
  */
 class ReferenceSortedSet<T> extends ReferenceSet<T> implements SortedSet<T> {
     private final SortedSet<Reference<T>> set;
-    private final Ref type;
+
     ReferenceSortedSet( SortedSet<Reference<T>> set, Ref type){
         super(set, type);
         this.set = set;
-        this.type = type;
     }
     
-    ReferenceSortedSet(SortedSet<Reference<T>> set, Ref type, ReferenceQueue<T> queue){
-        super(set, type, queue);
+    ReferenceSortedSet(SortedSet<Reference<T>> set, ReferenceQueuingFactory<T, Reference<T>> rqf){
+        super(set, rqf);
         this.set = set;
-        this.type = type;
     }
 
     @SuppressWarnings("unchecked")
@@ -62,29 +60,33 @@ class ReferenceSortedSet<T> extends Refe
         processQueue();
         Reference<T> from = wrapObj(fromElement, false);
         Reference<T> to = wrapObj(toElement, false);
-        return new ReferenceSortedSet<T>( set.subSet(from, to), type, getQueue());
+        return new ReferenceSortedSet<T>( set.subSet(from, to), getRQF());
     }
 
     public SortedSet<T> headSet(T toElement) {
         processQueue();
         Reference<T> to = wrapObj(toElement, false);
-        return new ReferenceSortedSet<T>(set.headSet(to), type, getQueue());
+        return new ReferenceSortedSet<T>(set.headSet(to), getRQF());
     }
 
     public SortedSet<T> tailSet(T fromElement) {
         processQueue();
         Reference<T> from = wrapObj(fromElement, false);
-        return new ReferenceSortedSet<T>(set.tailSet(from), type, getQueue());
+        return new ReferenceSortedSet<T>(set.tailSet(from), getRQF());
     }
 
     public T first() {
         processQueue();
-        return set.first().get();
+        Reference<T> t = set.first();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T last() {
         processQueue();
-        return set.last().get();
+        Reference<T> t = set.last();
+        if ( t != null ) return t.get();
+        return null;
     }
     
 }

Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java Mon Oct  3 08:24:23 2011
@@ -28,12 +28,10 @@ import java.util.Queue;
  */
 public class ReferencedQueue<T> extends ReferenceCollection<T> implements Queue<T> {
     private final Queue<Reference<T>> queue;
-    private final Ref type;
     
     public ReferencedQueue( Queue<Reference<T>> queue, Ref type){
         super(queue, type);
         this.queue = queue;
-        this.type = type;
     }
     public boolean offer(T e) {
         processQueue();
@@ -43,22 +41,30 @@ public class ReferencedQueue<T> extends 
 
     public T remove() {
         processQueue();
-        return queue.remove().get();
+        Reference<T> t = queue.remove();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T poll() {
         processQueue();
-        return queue.poll().get();
+        Reference<T> t = queue.poll();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T element() {
         processQueue();
-        return queue.element().get();
+        Reference<T> t = queue.element();
+        if ( t != null ) return t.get();
+        return null;
     }
 
     public T peek() {
         processQueue();
-        return queue.peek().get();
+        Reference<T> t = queue.peek();
+        if ( t != null ) return t.get();
+        return null;
     }
     
 }