You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2015/09/10 08:59:30 UTC

svn commit: r1702174 [2/5] - in /river/jtsk/skunk/qa-refactor-namespace/trunk: ./ qa/src/org/apache/river/test/impl/outrigger/javaspace05/ src/manifest/ src/net/jini/core/entry/ src/net/jini/core/lookup/ src/net/jini/entry/ src/net/jini/loader/ src/net...

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,607 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NavigableSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The purpose of this class is to implement all the possible interfaces
+ * that subclasses of ReferenceCollection may implement.  This is designed
+ * to fix the readResolve issue that occurs in de-serialised object graphs 
+ * containing circular references.
+ * 
+ * @author Peter Firmstone.
+ */
+abstract class ReadResolveFixCollectionCircularReferences<T> extends SerializationOfReferenceCollection<T>
+implements List<T>, Set<T>, SortedSet<T>, NavigableSet<T> , 
+Queue<T>, Deque<T>, BlockingQueue<T>, BlockingDeque<T>{
+   
+    // This abstract class must not hold any serial data.
+    
+    // Builder created List on deserialization
+    private volatile Collection<T> serialBuilt = null;
+    private volatile boolean built = false;
+   
+    ReadResolveFixCollectionCircularReferences(){}
+    
+    @Override
+    Collection<T> build() throws InstantiationException, IllegalAccessException,
+    ObjectStreamException {
+        if (isBuilt()) return getSerialBuilt();
+        setBuilt();
+        /* Traverse Inheritance heirarchy in reverse order */
+        if ( BlockingDeque.class.isAssignableFrom(getClazz()))
+            return RC.blockingDeque((BlockingDeque<Referrer<T>>) getCollection(), getType(), 10000L);
+        if ( BlockingQueue.class.isAssignableFrom(getClazz()))
+            return RC.blockingQueue((BlockingQueue<Referrer<T>>) getCollection(), getType(), 10000L);
+        if ( Deque.class.isAssignableFrom(getClazz()))
+            return RC.deque((Deque<Referrer<T>>) getCollection(), getType(), 10000L);
+        if ( Queue.class.isAssignableFrom(getClazz()))
+            return RC.queue((Queue<Referrer<T>>) getCollection(), getType(), 10000L);
+        if ( List.class.isAssignableFrom(getClazz()) )
+            return RC.list((List<Referrer<T>> ) getCollection(), getType(), 10000L);
+        if ( NavigableSet.class.isAssignableFrom(getClazz()) )
+            return RC.navigableSet((NavigableSet<Referrer<T>>) getCollection(), getType(), 10000L);
+        if ( SortedSet.class.isAssignableFrom(getClazz()) )
+            return RC.sortedSet((SortedSet<Referrer<T>>) getCollection(), getType(), 10000L);
+        if ( Set.class.isAssignableFrom(getClazz())) 
+            return RC.set((Set<Referrer<T>>) getCollection(), getType(), 10000L);
+        return RC.collection(getCollection(), getType(), 10000L);
+    }
+    
+    /**
+     * @serialData 
+     * @return the type
+     */
+    abstract Ref getType();
+
+    /**
+     * @serialData
+     * @return the collection
+     */
+    abstract Collection<Referrer<T>> getCollection();
+
+    /**
+     * @serialData
+     * @return the class
+     */
+    abstract Class getClazz();
+
+    /**
+     * @return the serialBuilt
+     */
+    Collection<T> getSerialBuilt() {
+        return serialBuilt;
+    }
+
+    /**
+     * @param serialBuilt the serialBuilt to set
+     */
+    Collection<T> setSerialBuilt(Collection<T> serialBuilt) {
+        this.serialBuilt = serialBuilt;
+        return serialBuilt;
+    }
+
+    /**
+     * @return the built
+     */
+    boolean isBuilt() {
+        return built;
+    }
+
+    /**
+     * 
+     */
+    void setBuilt() {
+        built = true;
+    }
+
+    
+        @Override
+    public int hashCode() {
+        if ( getSerialBuilt() instanceof List || getSerialBuilt() instanceof Set ){
+            return getSerialBuilt().hashCode();
+        }
+        return System.identityHashCode(this);
+    }
+    
+    /**
+     * Because equals and hashCode are not defined for collections, we 
+     * cannot guarantee consistent behaviour by implementing equals and
+     * hashCode.  A collection could be a list, set, queue or deque.
+     * So a List != Queue and a Set != list. therefore equals for collections is
+     * not defined.
+     * 
+     * However since two collections may both also be Lists, while abstracted
+     * from the client two lists may still be equal.
+     * 
+     * Unfortunately this object, when behaving as a delegate, is not always 
+     * equal to the object it is trying to represent.
+     * 
+     * @see Collection#equals(java.lang.Object) 
+     */
+    
+    @Override
+    public boolean equals(Object o){
+        if ( o == this ) return true;
+        if ( getSerialBuilt() instanceof List || getSerialBuilt() instanceof Set ){
+            return getSerialBuilt().equals(o);
+        }
+        return false;
+    }
+
+    @Override
+    public Iterator<T> iterator(){
+        if (getSerialBuilt() != null) return getSerialBuilt().iterator();
+        return new NullIterator<T>();
+    }
+    
+    @Override
+    public int size() {
+        if (getSerialBuilt() != null) return getSerialBuilt().size();
+        return 0;
+    }
+    
+    public boolean add(T t){
+        if (getSerialBuilt() != null) return getSerialBuilt().add(t);
+        return false;
+    }
+    
+    private void readObject(ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+    }
+    
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+    }
+
+    // If deserialized state may have changed since, if another type of
+    // Map, apart from ImmutableMap, uses the same builder for example.
+    final Object writeReplace() {
+        if ( isBuilt()) return getSerialBuilt();
+        return this;
+    }
+
+    final Object readResolve() throws ObjectStreamException{
+        try {
+            return setSerialBuilt(build());
+        } catch (InstantiationException ex) {
+            throw new InvalidClassException(this.getClass().toString(), ex.fillInStackTrace().toString());
+        } catch (IllegalAccessException ex) {
+            throw new InvalidClassException(this.getClass().toString(), ex.fillInStackTrace().toString());
+        }
+    }
+
+    
+    public boolean addAll(int index, Collection<? extends T> c) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).addAll(index, c);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T get(int index) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).get(index);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T set(int index, T element) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).set(index, element);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void add(int index, T element) {
+        if (getSerialBuilt() instanceof List)((List<T>) getSerialBuilt()).add(index, element);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T remove(int index) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).remove(index);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public int indexOf(Object o) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).indexOf(o);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public int lastIndexOf(Object o) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).lastIndexOf(o);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public ListIterator<T> listIterator() {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).listIterator();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public ListIterator<T> listIterator(int index) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).listIterator(index);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public List<T> subList(int fromIndex, int toIndex) {
+        if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).subList(fromIndex, toIndex);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public Comparator<? super T> comparator() {
+        if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).comparator();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public SortedSet<T> subSet(T fromElement, T toElement) {
+        if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).subSet(fromElement, toElement);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public SortedSet<T> headSet(T toElement) {
+        if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).headSet(toElement);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public SortedSet<T> tailSet(T fromElement) {
+        if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).tailSet(fromElement);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T first() {
+        if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).first();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T last() {
+        if (getSerialBuilt() instanceof SortedSet) 
+            return ((SortedSet<T>) getSerialBuilt()).last();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T lower(T e) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).lower(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T floor(T e) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).floor(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T ceiling(T e) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).ceiling(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T higher(T e) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).higher(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T pollFirst() {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).pollFirst();
+        if (getSerialBuilt() instanceof Deque) 
+            return ((Deque<T>) getSerialBuilt()).pollFirst();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T pollLast() {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).pollLast();
+        if (getSerialBuilt() instanceof Deque) 
+            return ((Deque<T>) getSerialBuilt()).pollLast();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public NavigableSet<T> descendingSet() {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).descendingSet();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public Iterator<T> descendingIterator() {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).descendingIterator();
+        if (getSerialBuilt() instanceof Deque) 
+            return ((Deque<T>) getSerialBuilt()).descendingIterator();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public NavigableSet<T> subSet(T fromElement, boolean fromInclusive, T toElement, boolean toInclusive) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).subSet(fromElement, fromInclusive, toElement, toInclusive);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public NavigableSet<T> headSet(T toElement, boolean inclusive) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).headSet(toElement, inclusive);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public NavigableSet<T> tailSet(T fromElement, boolean inclusive) {
+        if (getSerialBuilt() instanceof NavigableSet) 
+            return ((NavigableSet<T>) getSerialBuilt()).tailSet(fromElement, inclusive);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean offer(T e) {
+        if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).offer(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T remove() {
+        if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).remove();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T poll() {
+        if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).poll();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T element() {
+        if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).element();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T peek() {
+        if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).peek();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void addFirst(T e) {
+        if (getSerialBuilt() instanceof Deque) ((Deque<T>) getSerialBuilt()).addFirst(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void addLast(T e) {
+        if (getSerialBuilt() instanceof Deque) ((Deque<T>) getSerialBuilt()).addLast(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean offerFirst(T e) {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).offerFirst(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean offerLast(T e) {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).offerLast(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T removeFirst() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).removeFirst();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T removeLast() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).removeLast();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T getFirst() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).getFirst();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T getLast() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).getLast();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T peekFirst() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).peekFirst();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T peekLast() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).peekLast();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean removeFirstOccurrence(Object o) {
+        if (getSerialBuilt() instanceof Deque) 
+            return ((Deque<T>) getSerialBuilt()).removeFirstOccurrence(o);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean removeLastOccurrence(Object o) {
+        if (getSerialBuilt() instanceof Deque) 
+            return ((Deque<T>) getSerialBuilt()).removeLastOccurrence(o);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void push(T e) {
+        if (getSerialBuilt() instanceof Deque)((Deque<T>) getSerialBuilt()).push(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T pop() {
+        if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).pop();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void put(T e) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingQueue) ((BlockingQueue<T>) getSerialBuilt()).put(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingQueue) 
+            return ((BlockingQueue<T>) getSerialBuilt()).offer(e, timeout, unit);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T take() throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingQueue) 
+            return ((BlockingQueue<T>) getSerialBuilt()).take();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingQueue) 
+            return ((BlockingQueue<T>) getSerialBuilt()).poll(timeout, unit);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public int remainingCapacity() {
+        if (getSerialBuilt() instanceof BlockingQueue) 
+            return ((BlockingQueue<T>) getSerialBuilt()).remainingCapacity();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public int drainTo(Collection<? super T> c) {
+        if (getSerialBuilt() instanceof BlockingQueue) 
+            return ((BlockingQueue<T>) getSerialBuilt()).drainTo(c);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        if (getSerialBuilt() instanceof BlockingQueue) 
+            return ((BlockingQueue<T>) getSerialBuilt()).drainTo(c, maxElements);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void putFirst(T e) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            ((BlockingDeque<T>) getSerialBuilt()).putFirst(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public void putLast(T e) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            ((BlockingDeque<T>) getSerialBuilt()).putLast(e);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean offerFirst(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            return ((BlockingDeque<T>) getSerialBuilt()).offerFirst(e, timeout, unit);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public boolean offerLast(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            return ((BlockingDeque<T>) getSerialBuilt()).offerLast(e, timeout, unit);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T takeFirst() throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            return ((BlockingDeque<T>) getSerialBuilt()).takeFirst();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T takeLast() throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            return ((BlockingDeque<T>) getSerialBuilt()).takeLast();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            return ((BlockingDeque<T>) getSerialBuilt()).pollFirst(timeout, unit);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    
+    public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
+        if (getSerialBuilt() instanceof BlockingDeque) 
+            return ((BlockingDeque<T>) getSerialBuilt()).pollLast(timeout, unit);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+ 
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,316 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ *
+ * @author peter
+ */
+abstract class ReadResolveFixForMapCircularReferences<K,V> 
+    extends SerializationOfReferenceMap<K,V> 
+    implements Map<K,V>, SortedMap<K,V>, NavigableMap<K,V>, ConcurrentMap<K,V>, 
+                                                ConcurrentNavigableMap<K,V>{
+
+    // Builder created Map on deserialization
+    private volatile Map<K,V> map = null;
+    private volatile boolean built = false;
+    
+    @Override
+    public Comparator<? super K> comparator() {
+        if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).comparator();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
+        if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).subMap(fromKey, toKey);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public ConcurrentNavigableMap<K, V> headMap(K toKey) {
+        if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).headMap(toKey);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
+        if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).tailMap(fromKey);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public K firstKey() {
+        if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).firstKey();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public K lastKey() {
+        if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).lastKey();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    public NavigableSet<K> keySet() {
+        if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).keySet();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Collection<V> values() {
+        if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).values();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).entrySet();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public int size() {
+        return getMap().size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return getMap().isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return getMap().containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return getMap().containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+        return getMap().get(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        return getMap().put(key, value);
+    }
+
+    @Override
+    public V remove(Object key) {
+        return getMap().remove(key);
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+        getMap().putAll(m);
+    }
+
+    @Override
+    public void clear() {
+        getMap().clear();
+    }
+
+    @Override
+    public Entry<K, V> lowerEntry(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).lowerEntry(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public K lowerKey(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).lowerKey(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> floorEntry(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).floorEntry(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public K floorKey(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).floorKey(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> ceilingEntry(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).ceilingEntry(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public K ceilingKey(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).ceilingKey(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> higherEntry(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).higherEntry(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public K higherKey(K key) {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).higherKey(key);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> firstEntry() {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).firstEntry();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> lastEntry() {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).lastEntry();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> pollFirstEntry() {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).pollFirstEntry();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public Entry<K, V> pollLastEntry() {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).pollLastEntry();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public ConcurrentNavigableMap<K, V> descendingMap() {
+        if (getMap() instanceof NavigableMap) return ((ConcurrentNavigableMap<K,V>) getMap()).descendingMap();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public NavigableSet<K> navigableKeySet() {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).navigableKeySet();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public NavigableSet<K> descendingKeySet() {
+        if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).descendingKeySet();
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
+        if (getMap() instanceof NavigableMap) 
+            return ((ConcurrentNavigableMap<K,V>) getMap()).subMap(fromKey, fromInclusive, toKey, toInclusive);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
+        if (getMap() instanceof NavigableMap) 
+            return ((ConcurrentNavigableMap<K,V>) getMap()).headMap(toKey, inclusive);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
+        if (getMap() instanceof NavigableMap) 
+            return ((ConcurrentNavigableMap<K,V>) getMap()).tailMap(fromKey, inclusive);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        if (getMap() instanceof ConcurrentMap) 
+            return ((ConcurrentMap<K,V>) getMap()).putIfAbsent(key, value);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public boolean remove(Object key, Object value) {
+        if (getMap() instanceof ConcurrentMap) 
+            return ((ConcurrentMap<K,V>) getMap()).remove(key, value);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        if (getMap() instanceof ConcurrentMap) 
+            return ((ConcurrentMap<K,V>) getMap()).replace(key, oldValue, newValue);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    @Override
+    public V replace(K key, V value) {
+        if (getMap() instanceof ConcurrentMap) 
+            return ((ConcurrentMap<K,V>) getMap()).replace(key, value);
+        throw new UnsupportedOperationException("Unsupported Interface Method.");
+    }
+
+    /**
+     * @return the map
+     */
+    public Map<K,V> getMap() {
+        return map;
+    }
+
+    /**
+     * @return the built
+     */
+    public boolean isBuilt() {
+        return built;
+    }
+
+    @Override
+    Map<K, V> build() throws InstantiationException, IllegalAccessException, ObjectStreamException {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+    /**
+     * @serialData 
+     * @return the type
+     */
+    abstract Ref getType();
+
+    /**
+     * @serialData
+     * @return the collection
+     */
+    abstract Map<Referrer<K>,Referrer<V>> getRefMap();
+
+    /**
+     * @serialData
+     * @return the class
+     */
+    abstract Class getClazz();
+    
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,193 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * Ref enum represents types of references available for use in java
+ * collection framework implementations.
+ * </p><p>
+ * Only use STRONG, WEAK_IDENTITY and TIME based references as keys
+ * in Maps, use of other reference types should be discouraged
+ * due to unpredictable results, when for example, an equal WEAK
+ * Reference Key disappears due to garbage collection, or SOFT reference keys
+ * aren't garbage collected as expected.
+ * </p><p>
+ * Map implementations delete their key -> value mapping when either the 
+ * key or value References become unreachable. ConcurrentMap's will retry until
+ * putIfAbsent is successful when an existing Referrer key is cleared.
+ * </p><p>
+ * Only use STRONG and TIME based references in Queue's, other types break
+ * Queue's contract, null has special meaning, a cleared reference returns
+ * null.
+ * </p><p>
+ * Object.toString() is overridden in reference implementations to call
+ * toString() on the referent, if still reachable, otherwise the reference
+ * calls the superclass toString() method, where the superclass is a java 
+ * Reference subclass.  Consideration is being given to returning a null string
+ * "" instead, if you feel strongly about this, please contact the author.
+ * </p><p>
+ * Phantom references are not used, they are designed to replace
+ * {@link Object#finalize() } and remain unreachable, but not garbage collected until
+ * the {@link PhantomReference} also becomes unreachable, get() always returns
+ * null.
+ * </p><p>
+ * TIME and SOFT and SOFT_IDENTITY references update their access timestamp, 
+ * when Referrer.get(), Referrer.equals() or Referrer.toString() is called.  
+ * SOFT and SOFT_IDENTITY do so lazily and are not guaranteed to
+ * succeed in updating the access time.  SOFT references also update their
+ * access timestamp when either Referrer.hashCode() or Comparable.compareTo() 
+ * is called.
+ * </p><p>
+ * For sets and map keys that require traversal using a Comparator or
+ * Comparable, access times will be updated during each traversal. In
+ * these circumstances, SOFT and SOFT_IDENTITY references are typically not enqueued and
+ * do not behave as expected. 
+ * </p><p>
+ * SOFT references are only suited for use in lists or as values in maps and not
+ * suitable for keys in hash or tree maps.
+ * </p><p>
+ * TIME references only update their access timestamp during traversal when
+ * a Comparator or Comparable returns zero (a successful match), or when equals is true.
+ * TIME references are suitable for use as keys in tree and hash based maps 
+ * as well as sets and tasks in Queues.  In fact TIME is the only referrer
+ * suitable for use in Queue's and their subtypes.  Tasks in a Queue, if
+ * timed out, are first cancelled, then removed, they are not cleared as
+ * doing so could cause a null poll() return when a queue is not empty,
+ * which would violate the contract for Queue.
+ * </p><p>
+ * SOFT_IDENTITY references are suitable for use as
+ * keys in hash tables, hash maps and hash sets, since hashCode() does not update
+ * the access timestamp, while equals() does.  SOFT_IDENTITY references are not
+ * recommended for use in tree maps, tree sets or queues.
+ * </p>
+ * @see Reference
+ * @see WeakReference
+ * @see SoftReference
+ * @see PhantomReference
+ * @see Comparable
+ * @author Peter Firmstone.
+ */
+public enum Ref {
+    /**
+     * <P>
+     * TIME References implement equals based on equality of the referent
+     * objects.  Time references are STRONG references that are removed
+     * after a period of no access, even if they are strongly
+     * referenced outside the cache, they are removed.  TIME references don't
+     * rely on Garbage Collection algorithms.
+     * </P><P>
+     * TIME References support cancellation of tasks implementing Future, when
+     * the reference times out, if it contains a Future, it is cancelled.
+     * </P><P>
+     * A call to equals(), get() or toString(), will cause the timestamp on
+     * the reference to be updated, whereas hashCode() will not, in addition,
+     * Comparators and referents that implement Comparable, only update the
+     * timestamp if they return 0.  This allows referents to be inspected
+     * without update when they don't match.
+     * </P><P>
+     * TIME References require synchronisation for iteration, so during
+     * cleaning periods, a synchronised Collection or Map will be locked.
+     * A lock is still obtained for iterating over Concurrent Maps and 
+     * Collections, however this does not normally synchronise access between threads,
+     * only other cleaning task threads.
+     * </P>
+     * @see Future
+     * 
+     */
+    TIME,
+    /**
+     * <P>
+     * SOFT References implement equals based on equality of the referent 
+     * objects, while the referent is still reachable.  The hashCode
+     * implementation is based on the referent implementation of hashCode,
+     * while the referent is reachable.
+     * </P><P>
+     * SOFT References implement Comparable allowing the referent Objects
+     * to be compared if they implement Comparable.  If the referent Object
+     * doesn't implement Comparable, the hashCode's of the Reference is 
+     * compared instead.  If the referent Objects don't implement Comparable,
+     * then they shouldn't really be used in sorted collections.
+     * </P><P>
+     * Garbage collection must be the same as SoftReference.
+     * </P>
+     * @see SoftReference
+     * @see WeakReference
+     * @see Comparable
+     */
+    SOFT,
+    /**
+     * <P>
+     * SOFT_IDENTY References implement equals based on identity == of the
+     * referent objects.
+     * </P><P>
+     * Garbage collection must be the same as SoftReference.
+     * </P>
+     * @see SoftReference
+     */
+    SOFT_IDENTITY,
+    /**
+     * <P>
+     * WEAK References implement equals based on equality of the referent 
+     * objects, while the referent is still reachable.  The hashCode
+     * implementation is based on the referent implementation of hashCode,
+     * while the referent is reachable.
+     * </P><P>
+     * WEAK References implement comparable allowing the referent Objects
+     * to be compared if they implement Comparable.  If the referent Object
+     * doesn't implement Comparable, the hashCode's of the Reference is 
+     * compared instead.  If the referent Object's don't implement Comparable,
+     * then they shouldn't really be used in sorted collections.
+     * </P><P>
+     * Garbage collection must be the same as WeakReference.
+     * </P>
+     * @see WeakReference
+     * @see Comparable
+     */
+    WEAK,
+    /**
+     * <P>
+     * WEAK_IDENTY References implement equals based on identity == of the
+     * referent objects.
+     * </P><P>
+     * Garbage collection must be the same as WeakReference.
+     * </P>
+     * @see WeakReference
+     */
+    WEAK_IDENTITY,
+    /**
+     * <P>
+     * STRONG References implement equals and hashCode() based on the 
+     * equality of the underlying Object.
+     * </P><P>
+     * STRONG References implement Comparable allowing the referent Objects
+     * to be compared if they implement Comparable.  If the referent Object
+     * doesn't implement Comparable, the hashCode's of the Reference is 
+     * compared instead.  If the referent Object's don't implement Comparable,
+     * then they shouldn't really be used in sorted collections.
+     * </P><P>
+     * Garbage collection doesn't occur unless the Reference is cleared.
+     * </P>
+     * @see Comparable
+     */
+    STRONG
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2012 peter.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.concurrent;
+
+/**
+ * This interface defines the only ReferenceQueue method used,
+ * to not depend directly on ReferenceQueue, allowing the use of other Queue
+ * implementations.
+ * 
+ * @param <T> 
+ * @author peter
+ */
+interface RefQueue<T> {
+    public Object poll();
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2012 peter.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.concurrent;
+
+import java.lang.ref.ReferenceQueue;
+
+/**
+ * A ReferenceQueue that implements RefQueue
+ * 
+ * @author Peter Firmstone
+ */
+class RefReferenceQueue<T> extends ReferenceQueue<T> implements RefQueue<T>{}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,156 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+class ReferenceBlockingDeque<T> extends ReferenceDeque<T> implements BlockingDeque<T>{
+    private static final long serialVersionUID = 1L;
+    
+    private final BlockingDeque<Referrer<T>> deque;
+    
+    ReferenceBlockingDeque(BlockingDeque<Referrer<T>> deque, Ref type, boolean gcThreads, long gcCycle){
+        super(deque, type, gcThreads, gcCycle);
+        this.deque = deque;
+    }
+    
+    private void readObject(ObjectInputStream stream) 
+            throws InvalidObjectException{
+        throw new InvalidObjectException("Builder required");
+    }
+
+    public void putFirst(T e) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        deque.putFirst(r);
+    }
+
+
+    public void putLast(T e) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        deque.putLast(r);
+    }
+
+
+    public boolean offerFirst(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        return deque.offerFirst(r, timeout, unit);
+    }
+
+
+    public boolean offerLast(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        return deque.offerLast(r, timeout, unit);
+    }
+
+
+    public T takeFirst() throws InterruptedException {
+        processQueue();
+        Referrer<T> t = deque.takeFirst();
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+
+    public T takeLast() throws InterruptedException {
+        processQueue();
+        Referrer<T> t = deque.takeLast();
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+
+    public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> t = deque.pollFirst(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+
+    public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> t = deque.pollLast(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+
+    public void put(T e) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        deque.put(r);
+    }
+
+
+    public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        return deque.offer(r,timeout, unit);
+    }
+
+
+    public T take() throws InterruptedException {
+        processQueue();
+        Referrer<T> t = deque.take();
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> t = deque.poll(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+
+    public int remainingCapacity() {
+        return deque.remainingCapacity();
+    }
+
+
+    public int drainTo(Collection<? super T> c) {
+        processQueue();
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Referrer<T>> dr = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+        return deque.drainTo(dr);
+        }
+
+
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        processQueue();
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Referrer<T>> drain = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+        return deque.drainTo(drain, maxElements);
+        }
+    }

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,92 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @param <T> 
+ * @author Peter Firmstone.
+ */
+class ReferenceBlockingQueue<T> extends ReferencedQueue<T> implements BlockingQueue<T> {
+    private static final long serialVersionUID = 1L;
+    private final BlockingQueue<Referrer<T>> queue;
+    
+    ReferenceBlockingQueue(BlockingQueue<Referrer<T>> queue, Ref type, boolean gcThreads, long gcCycle){
+        super(queue, type, gcThreads, gcCycle);
+        this.queue = queue;
+    }
+    
+    private void readObject(ObjectInputStream stream) 
+            throws InvalidObjectException{
+        throw new InvalidObjectException("Builder required");
+    }
+
+    public void put(T e) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        queue.put(r);
+    }
+
+    public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> r = wrapObj(e, true, false);
+        return queue.offer(r, timeout, unit);
+    }
+
+    public T take() throws InterruptedException {
+        processQueue();
+        Referrer<T> t = queue.take();
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        processQueue();
+        Referrer<T> t = queue.poll(timeout, unit);
+        if ( t != null ) return t.get();
+        return null;
+    }
+
+    public int remainingCapacity() {
+        processQueue();
+        return queue.remainingCapacity();
+    }
+
+    public int drainTo(Collection<? super T> c) {
+        processQueue();
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Referrer<T>> dr = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+        return queue.drainTo(dr);   
+        }
+
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        processQueue();
+        if (c == null) throw new NullPointerException();
+        if (c == this) throw new IllegalArgumentException();
+        @SuppressWarnings("unchecked")
+        Collection<Referrer<T>> drain = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+        return queue.drainTo(drain, maxElements);
+        }
+    
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,207 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.io.WriteAbortedException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Collection of Reference Objects, the developer may chose any Collection
+ * implementation to store the References, which is passed in a runtime.
+ * 
+ * The underlying Collection implementation governs the specific behaviour of this
+ * Collection.
+ * 
+ * Synchronisation must be implemented by the underlying Collection and cannot
+ * be performed externally to this class.  The underlying Collection must
+ * also be mutable.  Objects will be removed automatically from the underlying
+ * Collection when they are eligible for garbage collection.
+ * 
+ * Weak, Weak Identity, Soft, Soft Identity or Strong references may be used.
+ * This Collection may be used as an Object pool cache or any other purpose 
+ * that requires unique memory handling.
+ * 
+ * For concurrent threads, it is recommended to encapsulate the underlying
+ * collection in a multi read, single write collection for scalability.
+ * 
+ * @see Ref
+ * @see ConcurrentCollections#multiReadCollection(java.util.Collection) 
+ * @author Peter Firmstone.
+ */
+class ReferenceCollection<T> extends AbstractCollection<T> 
+                                implements Collection<T>, Serializable {
+    private static final long serialVersionUID = 1L;
+    private final Collection<Referrer<T>> col;
+    private final ReferenceQueuingFactory<T, Referrer<T>> rqf;
+    private final Ref type;
+    
+    @SuppressWarnings("unchecked")
+    ReferenceCollection(Collection<Referrer<T>> col, Ref type, boolean gcThread, long gcCycle){
+        RefQueue<T> que = null;
+        if (type == Ref.TIME) que = new TimedRefQueue();
+        else if (type != Ref.STRONG) que = new RefReferenceQueue<T>();
+        this.col = col;
+        ReferenceProcessor<T> rp = new ReferenceProcessor<T>(col, type, que, gcThread, col);
+        this.type = type;
+        rqf = rp;
+        rp.start(gcCycle);
+    }
+    
+    ReferenceCollection(Collection<Referrer<T>> col, 
+            ReferenceQueuingFactory<T, Referrer<T>> rqf, Ref type){
+        this.col = col;
+        this.rqf = rqf;
+        this.type = type;
+    }
+    
+    void processQueue(){
+        //rqf.processQueue();
+        }
+    
+    ReferenceQueuingFactory<T, Referrer<T>> getRQF(){
+        return rqf;
+    }
+    
+    Ref getRef(){
+        return type;
+    }
+    
+    Referrer<T> wrapObj(T t, boolean enqueue, boolean temporary){
+        return rqf.referenced(t, enqueue, temporary);
+    }
+    
+    public int size() {
+        processQueue();
+        return col.size();
+    }
+
+    public boolean isEmpty() {
+        processQueue();
+        return col.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+        processQueue();
+        return col.contains(wrapObj((T) o, false, true));
+    }
+    
+    /**
+     * This Iterator may return null values if garbage collection
+     * runs during iteration.
+     * 
+     * Always check for null values.
+     * 
+     * @return T - possibly null.
+     */
+    public Iterator<T> iterator() {
+        processQueue();
+        return new ReferenceIterator<T>(col.iterator());
+    }
+
+    public boolean add(T e) {
+        processQueue();
+        return col.add(wrapObj(e, true, false));
+    }
+
+    public boolean remove(Object o) {
+        processQueue();
+        return col.remove(wrapObj((T) o, false, true));
+    }
+
+ 
+    @SuppressWarnings("unchecked")
+    public boolean containsAll(Collection<?> c) {
+        processQueue();
+        return col.containsAll(new CollectionDecorator<T>((Collection<T>) c, getRQF(), false, true));
+    }
+
+    
+    @SuppressWarnings("unchecked")
+    public boolean addAll(Collection<? extends T> c) {
+        processQueue();
+        return col.addAll(new CollectionDecorator<T>((Collection<T>) c, getRQF(), true, false));
+    }
+
+    public void clear() {
+        col.clear();
+    }
+    
+    /*
+     * The next three methods are suitable implementations for subclasses also.
+     */
+    public String toString(){
+        return col.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        if ( col instanceof List || col instanceof Set ){
+            return col.hashCode();
+        }
+        return System.identityHashCode(this);
+    }
+    
+    /**
+     * Because equals and hashCode are not defined for collections, we 
+     * cannot guarantee consistent behaviour by implementing equals and
+     * hashCode.  A collection could be a list, set, queue or deque.
+     * So a List != Queue and a Set != list. therefore equals for collections is
+     * not defined.
+     * 
+     * However since two collections may both also be Lists, while abstracted
+     * from the client two lists may still be equal.
+     * @see Collection#equals(java.lang.Object) 
+     */
+    
+    @Override
+    public boolean equals(Object o){
+        if ( o == this ) return true;
+        if ( col instanceof List || col instanceof Set ){
+            return col.equals(o);
+        }
+        return false;
+    }
+    
+    final Object writeReplace() throws ObjectStreamException {
+        try {
+            // returns a Builder instead of this class.
+            return SerializationOfReferenceCollection.create(getClass(), col, type );
+        } catch (InstantiationException ex) {
+            throw new WriteAbortedException("Unable to create serialization proxy", ex);
+        } catch (IllegalAccessException ex) {
+            throw new WriteAbortedException("Unable to create serialization proxy", ex);
+        }
+    }
+    
+    private void readObject(ObjectInputStream stream) 
+            throws InvalidObjectException{
+        throw new InvalidObjectException("Builder required");
+    }
+
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,59 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ *
+ * @author peter
+ */
+abstract class ReferenceCollectionRefreshAfterSerialization<T> 
+                            extends ReadResolveFixCollectionCircularReferences<T> {
+
+    ReferenceCollectionRefreshAfterSerialization() {
+    }
+
+    @Override
+    final Collection<T> build() throws InstantiationException, IllegalAccessException, ObjectStreamException {
+        Collection<T> result = super.build();
+        /* What if the underlying collection is immutable?
+         * The ReferenceQueuingFactory is unknown until the ReferenceCollection
+         * has been built.
+         */
+        if (result instanceof ReferenceCollection) {
+            ReferenceCollection<T> refCol = (ReferenceCollection<T>) result;
+            ReferenceQueuingFactory<T, Referrer<T>> rqf = refCol.getRQF();
+            Iterator<Referrer<T>> colIt = getCollection().iterator();
+            while (colIt.hasNext()) {
+                Referrer<T> ref = colIt.next();
+                if (ref == null) {
+                    continue;
+                }
+                if (ref instanceof AbstractReferrerDecorator) {
+                    ((AbstractReferrerDecorator<T>) ref).refresh(rqf);
+                } else {
+                    throw new InvalidObjectException("Referrer's must be an AbstractReferrerWraper for ReferenceCollection");
+                }
+            }
+        }
+        return result;
+    }
+    
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,88 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ *This class is the serial form of ReferenceCollection and all it's subclasses.
+ * 
+ * While the serial form of this class will remain compatible with itself,
+ * ReferenceCollection may replace this implementation with another
+ * at some point in the future.
+ * 
+ * This class will still be able to de-serialise into a ReferenceCollection.
+ * 
+ * @author peter
+ */
+ class ReferenceCollectionSerialData<T> 
+    extends ReferenceCollectionRefreshAfterSerialization<T> implements Serializable {
+    private static final long serialVersionUID = 1L;
+    
+    /** @serialField  */
+    private Ref type;
+    /** @serialField */
+    private Collection<Referrer<T>> collection;
+    /** @serialField */
+    private Class referenceCollectionClass;
+
+    @SuppressWarnings("unchecked")
+   ReferenceCollectionSerialData( Class clazz,
+            Collection<Referrer<T>> underlyingCollection, Ref type) 
+           throws InstantiationException, IllegalAccessException{
+        // Create a new instance of the underlying collection and
+        // add all objects.
+        if ( clazz == null || underlyingCollection == null || type == null){
+            throw new NullPointerException("null parameters prohibited");
+        }
+        this.collection = underlyingCollection;
+        this.type = type;
+        this.referenceCollectionClass = clazz;
+   }
+    
+    @Override
+    public Ref getType() {
+        return type;
+    }
+
+    @Override
+    public Collection<Referrer<T>> getCollection() {
+        return collection;
+    }
+
+    @Override
+    public Class getClazz() {
+        return referenceCollectionClass;
+    }
+
+     private void readObject(ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if ( referenceCollectionClass == null || collection == null || type == null){
+            throw new InvalidObjectException("null fields found after deserialization");
+        }
+    }
+    
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+    }
+    
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,52 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ *
+ * @param <T> 
+ * @author Peter Firmstone.
+ */
+class ReferenceComparator<T> extends AbstractReferenceComparator<T> implements  Serializable{
+    private static final long serialVersionUID = 1L;
+    private Comparator<? super T> comparator;
+    ReferenceComparator(Comparator<? super T> comparator){
+        if ( comparator == null ) throw new IllegalArgumentException("Null value prohibited");
+        this.comparator = comparator;
+    }
+    
+    Comparator<? super T> get(){
+        return comparator;
+    }
+    
+    private void readObject(ObjectInputStream in)
+        throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        if ( get() == null ) throw new InvalidObjectException("Null value prohibited");
+    }
+    
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+    }
+
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,107 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+
+import java.lang.ref.Reference;
+import java.util.Random;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A referenced hash map, that encapsulates and utilises any ConcurrentMap
+ * implementation passed in at construction.
+ * 
+ * Based on any ConcurrentMap implementation, it doesn't accept null keys or values.
+ *
+ * It is recommended although not mandatory to use identity based References for keys,
+ * unexpected results occur when relying on equal keys, if one key is no longer 
+ * strongly reachable and has been garbage collected and removed from the 
+ * Map.
+ * 
+ * 
+ * 
+ * If either a key or value, is no longer strongly reachable, their mapping
+ * will be queued for removal and garbage collection, in compliance with
+ * the Reference implementation selected.
+ *
+ * @param <K> 
+ * @param <V> 
+ * @see Ref
+ * @author Peter Firmstone.
+ *
+ * @since 2.3
+ */
+class ReferenceConcurrentMap<K, V> extends ReferenceMap<K, V> implements ConcurrentMap<K, V> {
+
+    // ConcurrentMap must be protected from null values?  It changes it's behaviour, is that a problem?
+    private final ConcurrentMap<Referrer<K>, Referrer<V>> map;
+    
+    ReferenceConcurrentMap(ConcurrentMap<Referrer<K>,Referrer<V>> map, Ref key, Ref val, boolean gcThreads, long gcKeyCycle, long gcValCycle){
+        super (map, key, val, gcThreads, gcKeyCycle, gcValCycle);
+        this.map = map;
+    }
+    
+    ReferenceConcurrentMap(ConcurrentMap<Referrer<K>, Referrer<V>> map,
+            ReferenceQueuingFactory<K, Referrer<K>> krqf, ReferenceQueuingFactory<V, Referrer<V>> vrqf, Ref key, Ref val){
+        super(map, krqf, vrqf, key, val);
+        this.map = map;
+    }
+    
+    public V putIfAbsent(K key, V value) {
+        processQueue();  //may be a slight delay before atomic putIfAbsent
+        Referrer<K> k = wrapKey(key, true, false);
+        Referrer<V> v = wrapVal(value, true, false);
+        Referrer<V> val = map.putIfAbsent(k, v);
+        while ( val != null ) {
+            V existed = val.get();
+            // We hold a strong reference to value, so 
+            if ( existed == null ){
+                // stale reference must be replaced, it has been garbage collect but hasn't 
+                // been removed, we must treat it like the entry doesn't exist.
+                if ( map.replace(k, val, v)){
+                    // replace successful
+                    return null; // Because officially there was no record.
+                } else {
+                    // Another thread may have replaced it.
+                    val = map.putIfAbsent(k, v);
+                }
+            } else {
+                return existed;
+            }
+        }
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public boolean remove(Object key, Object value) {
+        processQueue();
+        return map.remove(wrapKey((K) key, false, true), wrapVal((V) value, false, true));
+    }
+
+    public boolean replace(K key, V oldValue, V newValue) {
+        processQueue();
+        return map.replace(wrapKey(key, false, true), wrapVal(oldValue, false, true), wrapVal(newValue, true, false));
+    }
+
+    public V replace(K key, V value) {
+        processQueue();
+        Referrer<V> val = map.replace(wrapKey(key, false, true), wrapVal(value, true, false));
+        if ( val != null ) return val.get();
+        return null;
+    }
+}

Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,242 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ *
+ * @param <K> 
+ * @param <V> 
+ * @author Peter Firmstone.
+ */
+class ReferenceConcurrentNavigableMap<K,V> 
+extends ReferenceConcurrentMap<K,V> implements ConcurrentNavigableMap<K,V>{
+    private final ConcurrentNavigableMap<Referrer<K>,Referrer<V>> map;
+    
+    ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Referrer<K>, Referrer<V>> map, Ref keyRef, Ref valRef, boolean gcThreads, long gcKeyCycle, long gcValCycle){
+        super(map, keyRef, valRef, gcThreads, gcKeyCycle, gcValCycle);
+        this.map = map;
+    }
+
+    ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Referrer<K>, Referrer<V>> map,
+            ReferenceQueuingFactory<K, Referrer<K>> krqf, ReferenceQueuingFactory<V, Referrer<V>> vrqf, Ref key, Ref val){
+        super(map, krqf, vrqf, key, val);
+        this.map = map;
+    }
+    
+    public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.subMap(
+                wrapKey(fromKey, false, true), 
+                fromInclusive, 
+                wrapKey(toKey, false, true),
+                toInclusive
+            ), 
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.headMap(wrapKey(toKey, false, true), inclusive),
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.tailMap(wrapKey(fromKey, false, true), inclusive),
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.subMap(wrapKey(fromKey, false, true), wrapKey(toKey, false, true)),
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public ConcurrentNavigableMap<K, V> headMap(K toKey) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.headMap(wrapKey(toKey, false, true)),
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.tailMap(wrapKey(fromKey, false, true)),
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public ConcurrentNavigableMap<K, V> descendingMap() {
+        processQueue();
+        return new ReferenceConcurrentNavigableMap<K,V>(
+            map.descendingMap(),
+            getKeyRQF(),
+            getValRQF(), keyRef(), valRef()
+        );
+    }
+
+    public NavigableSet<K> navigableKeySet() {
+        processQueue();
+        return new ReferenceNavigableSet<K>(map.navigableKeySet(), getKeyRQF(), keyRef());
+    }
+
+    public NavigableSet<K> keySet() {
+        processQueue();
+        return new ReferenceNavigableSet<K>(map.keySet(), getKeyRQF(), keyRef());
+    }
+
+    public NavigableSet<K> descendingKeySet() {
+        processQueue();
+        return new ReferenceNavigableSet<K>(map.descendingKeySet(), getKeyRQF(), keyRef());
+    }
+
+    public Entry<K, V> lowerEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.lowerEntry(wrapKey(key, false, true)),
+            getValRQF()
+        );
+    }
+
+    public K lowerKey(K key) {
+        processQueue();
+        Referrer<K> k = map.lowerKey(wrapKey(key, false, true));
+        if ( k != null ) return k.get();
+        return null;
+    }
+
+    public Entry<K, V> floorEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.floorEntry(wrapKey(key, false, true)),
+            getValRQF()
+        );
+    }
+
+    public K floorKey(K key) {
+        processQueue();
+        Referrer<K> k = map.floorKey(wrapKey(key, false, true));
+        if ( k != null ) return k.get();
+        return null;
+    }
+
+    public Entry<K, V> ceilingEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.ceilingEntry(wrapKey(key, false, true)),
+            getValRQF()
+        );
+    }
+
+    public K ceilingKey(K key) {
+        processQueue();
+        Referrer<K> k = map.ceilingKey(wrapKey(key, false, true));
+        if ( k != null ) return k.get();
+        return null;
+    }
+
+    public Entry<K, V> higherEntry(K key) {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.higherEntry(wrapKey(key, false, true)),
+            getValRQF()
+        );
+    }
+
+    public K higherKey(K key) {
+        processQueue();
+        Referrer<K> k = map.higherKey(wrapKey(key, false, true));
+        if ( k != null ) return k.get();
+        return null;
+    }
+
+    public Entry<K, V> firstEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.firstEntry(),
+            getValRQF()
+        );
+    }
+
+    public Entry<K, V> lastEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.lastEntry(),
+            getValRQF()
+        );
+    }
+
+    public Entry<K, V> pollFirstEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.pollFirstEntry(),
+            getValRQF()
+        );
+    }
+
+    public Entry<K, V> pollLastEntry() {
+        processQueue();
+        return new ReferenceEntryFacade<K,V>(
+            map.pollLastEntry(),
+            getValRQF()
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    public Comparator<? super K> comparator() {
+        processQueue();
+        Comparator<? super Referrer<K>> c = map.comparator();
+        if ( c instanceof ReferenceComparator){
+            return ((ReferenceComparator) c).get();
+        }
+        return null;
+    }
+
+    public K firstKey() {
+        processQueue();
+        Referrer<K> k = map.firstKey();
+        if ( k != null ) return k.get();
+        return null;
+    }
+
+    public K lastKey() {
+        processQueue();
+        Referrer<K> k = map.lastKey();
+        if ( k != null ) return k.get();
+        return null;
+    }
+}