You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2015/09/10 08:59:30 UTC
svn commit: r1702174 [2/5] - in
/river/jtsk/skunk/qa-refactor-namespace/trunk: ./
qa/src/org/apache/river/test/impl/outrigger/javaspace05/ src/manifest/
src/net/jini/core/entry/ src/net/jini/core/lookup/ src/net/jini/entry/
src/net/jini/loader/ src/net...
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixCollectionCircularReferences.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,607 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NavigableSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The purpose of this class is to implement all the possible interfaces
+ * that subclasses of ReferenceCollection may implement. This is designed
+ * to fix the readResolve issue that occurs in de-serialised object graphs
+ * containing circular references.
+ *
+ * @author Peter Firmstone.
+ */
+abstract class ReadResolveFixCollectionCircularReferences<T> extends SerializationOfReferenceCollection<T>
+implements List<T>, Set<T>, SortedSet<T>, NavigableSet<T> ,
+Queue<T>, Deque<T>, BlockingQueue<T>, BlockingDeque<T>{
+
+ // This abstract class must not hold any serial data.
+
+ // Builder created List on deserialization
+ private volatile Collection<T> serialBuilt = null;
+ private volatile boolean built = false;
+
+ ReadResolveFixCollectionCircularReferences(){}
+
+ @Override
+ Collection<T> build() throws InstantiationException, IllegalAccessException,
+ ObjectStreamException {
+ if (isBuilt()) return getSerialBuilt();
+ setBuilt();
+ /* Traverse Inheritance heirarchy in reverse order */
+ if ( BlockingDeque.class.isAssignableFrom(getClazz()))
+ return RC.blockingDeque((BlockingDeque<Referrer<T>>) getCollection(), getType(), 10000L);
+ if ( BlockingQueue.class.isAssignableFrom(getClazz()))
+ return RC.blockingQueue((BlockingQueue<Referrer<T>>) getCollection(), getType(), 10000L);
+ if ( Deque.class.isAssignableFrom(getClazz()))
+ return RC.deque((Deque<Referrer<T>>) getCollection(), getType(), 10000L);
+ if ( Queue.class.isAssignableFrom(getClazz()))
+ return RC.queue((Queue<Referrer<T>>) getCollection(), getType(), 10000L);
+ if ( List.class.isAssignableFrom(getClazz()) )
+ return RC.list((List<Referrer<T>> ) getCollection(), getType(), 10000L);
+ if ( NavigableSet.class.isAssignableFrom(getClazz()) )
+ return RC.navigableSet((NavigableSet<Referrer<T>>) getCollection(), getType(), 10000L);
+ if ( SortedSet.class.isAssignableFrom(getClazz()) )
+ return RC.sortedSet((SortedSet<Referrer<T>>) getCollection(), getType(), 10000L);
+ if ( Set.class.isAssignableFrom(getClazz()))
+ return RC.set((Set<Referrer<T>>) getCollection(), getType(), 10000L);
+ return RC.collection(getCollection(), getType(), 10000L);
+ }
+
+ /**
+ * @serialData
+ * @return the type
+ */
+ abstract Ref getType();
+
+ /**
+ * @serialData
+ * @return the collection
+ */
+ abstract Collection<Referrer<T>> getCollection();
+
+ /**
+ * @serialData
+ * @return the class
+ */
+ abstract Class getClazz();
+
+ /**
+ * @return the serialBuilt
+ */
+ Collection<T> getSerialBuilt() {
+ return serialBuilt;
+ }
+
+ /**
+ * @param serialBuilt the serialBuilt to set
+ */
+ Collection<T> setSerialBuilt(Collection<T> serialBuilt) {
+ this.serialBuilt = serialBuilt;
+ return serialBuilt;
+ }
+
+ /**
+ * @return the built
+ */
+ boolean isBuilt() {
+ return built;
+ }
+
+ /**
+ *
+ */
+ void setBuilt() {
+ built = true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ if ( getSerialBuilt() instanceof List || getSerialBuilt() instanceof Set ){
+ return getSerialBuilt().hashCode();
+ }
+ return System.identityHashCode(this);
+ }
+
+ /**
+ * Because equals and hashCode are not defined for collections, we
+ * cannot guarantee consistent behaviour by implementing equals and
+ * hashCode. A collection could be a list, set, queue or deque.
+ * So a List != Queue and a Set != list. therefore equals for collections is
+ * not defined.
+ *
+ * However since two collections may both also be Lists, while abstracted
+ * from the client two lists may still be equal.
+ *
+ * Unfortunately this object, when behaving as a delegate, is not always
+ * equal to the object it is trying to represent.
+ *
+ * @see Collection#equals(java.lang.Object)
+ */
+
+ @Override
+ public boolean equals(Object o){
+ if ( o == this ) return true;
+ if ( getSerialBuilt() instanceof List || getSerialBuilt() instanceof Set ){
+ return getSerialBuilt().equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public Iterator<T> iterator(){
+ if (getSerialBuilt() != null) return getSerialBuilt().iterator();
+ return new NullIterator<T>();
+ }
+
+ @Override
+ public int size() {
+ if (getSerialBuilt() != null) return getSerialBuilt().size();
+ return 0;
+ }
+
+ public boolean add(T t){
+ if (getSerialBuilt() != null) return getSerialBuilt().add(t);
+ return false;
+ }
+
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+ // If deserialized state may have changed since, if another type of
+ // Map, apart from ImmutableMap, uses the same builder for example.
+ final Object writeReplace() {
+ if ( isBuilt()) return getSerialBuilt();
+ return this;
+ }
+
+ final Object readResolve() throws ObjectStreamException{
+ try {
+ return setSerialBuilt(build());
+ } catch (InstantiationException ex) {
+ throw new InvalidClassException(this.getClass().toString(), ex.fillInStackTrace().toString());
+ } catch (IllegalAccessException ex) {
+ throw new InvalidClassException(this.getClass().toString(), ex.fillInStackTrace().toString());
+ }
+ }
+
+
+ public boolean addAll(int index, Collection<? extends T> c) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).addAll(index, c);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T get(int index) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).get(index);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T set(int index, T element) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).set(index, element);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void add(int index, T element) {
+ if (getSerialBuilt() instanceof List)((List<T>) getSerialBuilt()).add(index, element);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T remove(int index) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).remove(index);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public int indexOf(Object o) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).indexOf(o);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public int lastIndexOf(Object o) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).lastIndexOf(o);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public ListIterator<T> listIterator() {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).listIterator();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public ListIterator<T> listIterator(int index) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).listIterator(index);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public List<T> subList(int fromIndex, int toIndex) {
+ if (getSerialBuilt() instanceof List) return ((List<T>) getSerialBuilt()).subList(fromIndex, toIndex);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public Comparator<? super T> comparator() {
+ if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).comparator();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public SortedSet<T> subSet(T fromElement, T toElement) {
+ if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).subSet(fromElement, toElement);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public SortedSet<T> headSet(T toElement) {
+ if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).headSet(toElement);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public SortedSet<T> tailSet(T fromElement) {
+ if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).tailSet(fromElement);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T first() {
+ if (getSerialBuilt() instanceof SortedSet) return ((SortedSet<T>) getSerialBuilt()).first();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T last() {
+ if (getSerialBuilt() instanceof SortedSet)
+ return ((SortedSet<T>) getSerialBuilt()).last();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T lower(T e) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).lower(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T floor(T e) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).floor(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T ceiling(T e) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).ceiling(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T higher(T e) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).higher(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T pollFirst() {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).pollFirst();
+ if (getSerialBuilt() instanceof Deque)
+ return ((Deque<T>) getSerialBuilt()).pollFirst();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T pollLast() {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).pollLast();
+ if (getSerialBuilt() instanceof Deque)
+ return ((Deque<T>) getSerialBuilt()).pollLast();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public NavigableSet<T> descendingSet() {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).descendingSet();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public Iterator<T> descendingIterator() {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).descendingIterator();
+ if (getSerialBuilt() instanceof Deque)
+ return ((Deque<T>) getSerialBuilt()).descendingIterator();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public NavigableSet<T> subSet(T fromElement, boolean fromInclusive, T toElement, boolean toInclusive) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).subSet(fromElement, fromInclusive, toElement, toInclusive);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public NavigableSet<T> headSet(T toElement, boolean inclusive) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).headSet(toElement, inclusive);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public NavigableSet<T> tailSet(T fromElement, boolean inclusive) {
+ if (getSerialBuilt() instanceof NavigableSet)
+ return ((NavigableSet<T>) getSerialBuilt()).tailSet(fromElement, inclusive);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean offer(T e) {
+ if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).offer(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T remove() {
+ if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).remove();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T poll() {
+ if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).poll();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T element() {
+ if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).element();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T peek() {
+ if (getSerialBuilt() instanceof Queue) return ((Queue<T>) getSerialBuilt()).peek();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void addFirst(T e) {
+ if (getSerialBuilt() instanceof Deque) ((Deque<T>) getSerialBuilt()).addFirst(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void addLast(T e) {
+ if (getSerialBuilt() instanceof Deque) ((Deque<T>) getSerialBuilt()).addLast(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean offerFirst(T e) {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).offerFirst(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean offerLast(T e) {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).offerLast(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T removeFirst() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).removeFirst();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T removeLast() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).removeLast();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T getFirst() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).getFirst();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T getLast() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).getLast();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T peekFirst() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).peekFirst();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T peekLast() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).peekLast();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean removeFirstOccurrence(Object o) {
+ if (getSerialBuilt() instanceof Deque)
+ return ((Deque<T>) getSerialBuilt()).removeFirstOccurrence(o);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean removeLastOccurrence(Object o) {
+ if (getSerialBuilt() instanceof Deque)
+ return ((Deque<T>) getSerialBuilt()).removeLastOccurrence(o);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void push(T e) {
+ if (getSerialBuilt() instanceof Deque)((Deque<T>) getSerialBuilt()).push(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T pop() {
+ if (getSerialBuilt() instanceof Deque) return ((Deque<T>) getSerialBuilt()).pop();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void put(T e) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingQueue) ((BlockingQueue<T>) getSerialBuilt()).put(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingQueue)
+ return ((BlockingQueue<T>) getSerialBuilt()).offer(e, timeout, unit);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T take() throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingQueue)
+ return ((BlockingQueue<T>) getSerialBuilt()).take();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingQueue)
+ return ((BlockingQueue<T>) getSerialBuilt()).poll(timeout, unit);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public int remainingCapacity() {
+ if (getSerialBuilt() instanceof BlockingQueue)
+ return ((BlockingQueue<T>) getSerialBuilt()).remainingCapacity();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public int drainTo(Collection<? super T> c) {
+ if (getSerialBuilt() instanceof BlockingQueue)
+ return ((BlockingQueue<T>) getSerialBuilt()).drainTo(c);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ if (getSerialBuilt() instanceof BlockingQueue)
+ return ((BlockingQueue<T>) getSerialBuilt()).drainTo(c, maxElements);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void putFirst(T e) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ ((BlockingDeque<T>) getSerialBuilt()).putFirst(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public void putLast(T e) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ ((BlockingDeque<T>) getSerialBuilt()).putLast(e);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean offerFirst(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ return ((BlockingDeque<T>) getSerialBuilt()).offerFirst(e, timeout, unit);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public boolean offerLast(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ return ((BlockingDeque<T>) getSerialBuilt()).offerLast(e, timeout, unit);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T takeFirst() throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ return ((BlockingDeque<T>) getSerialBuilt()).takeFirst();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T takeLast() throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ return ((BlockingDeque<T>) getSerialBuilt()).takeLast();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ return ((BlockingDeque<T>) getSerialBuilt()).pollFirst(timeout, unit);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+ public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
+ if (getSerialBuilt() instanceof BlockingDeque)
+ return ((BlockingDeque<T>) getSerialBuilt()).pollLast(timeout, unit);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReadResolveFixForMapCircularReferences.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,316 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ *
+ * @author peter
+ */
+abstract class ReadResolveFixForMapCircularReferences<K,V>
+ extends SerializationOfReferenceMap<K,V>
+ implements Map<K,V>, SortedMap<K,V>, NavigableMap<K,V>, ConcurrentMap<K,V>,
+ ConcurrentNavigableMap<K,V>{
+
+ // Builder created Map on deserialization
+ private volatile Map<K,V> map = null;
+ private volatile boolean built = false;
+
+ @Override
+ public Comparator<? super K> comparator() {
+ if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).comparator();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
+ if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).subMap(fromKey, toKey);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public ConcurrentNavigableMap<K, V> headMap(K toKey) {
+ if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).headMap(toKey);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
+ if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).tailMap(fromKey);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public K firstKey() {
+ if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).firstKey();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public K lastKey() {
+ if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).lastKey();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ public NavigableSet<K> keySet() {
+ if (getMap() instanceof SortedMap) return ((ConcurrentNavigableMap<K,V>) getMap()).keySet();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Collection<V> values() {
+ if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).values();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ if (getMap() instanceof SortedMap) return ((SortedMap<K,V>) getMap()).entrySet();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public int size() {
+ return getMap().size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return getMap().isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return getMap().containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return getMap().containsValue(value);
+ }
+
+ @Override
+ public V get(Object key) {
+ return getMap().get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return getMap().put(key, value);
+ }
+
+ @Override
+ public V remove(Object key) {
+ return getMap().remove(key);
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ getMap().putAll(m);
+ }
+
+ @Override
+ public void clear() {
+ getMap().clear();
+ }
+
+ @Override
+ public Entry<K, V> lowerEntry(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).lowerEntry(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public K lowerKey(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).lowerKey(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> floorEntry(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).floorEntry(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public K floorKey(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).floorKey(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> ceilingEntry(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).ceilingEntry(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public K ceilingKey(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).ceilingKey(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> higherEntry(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).higherEntry(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public K higherKey(K key) {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).higherKey(key);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> firstEntry() {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).firstEntry();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> lastEntry() {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).lastEntry();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> pollFirstEntry() {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).pollFirstEntry();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public Entry<K, V> pollLastEntry() {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).pollLastEntry();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public ConcurrentNavigableMap<K, V> descendingMap() {
+ if (getMap() instanceof NavigableMap) return ((ConcurrentNavigableMap<K,V>) getMap()).descendingMap();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public NavigableSet<K> navigableKeySet() {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).navigableKeySet();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public NavigableSet<K> descendingKeySet() {
+ if (getMap() instanceof NavigableMap) return ((NavigableMap<K,V>) getMap()).descendingKeySet();
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
+ if (getMap() instanceof NavigableMap)
+ return ((ConcurrentNavigableMap<K,V>) getMap()).subMap(fromKey, fromInclusive, toKey, toInclusive);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
+ if (getMap() instanceof NavigableMap)
+ return ((ConcurrentNavigableMap<K,V>) getMap()).headMap(toKey, inclusive);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
+ if (getMap() instanceof NavigableMap)
+ return ((ConcurrentNavigableMap<K,V>) getMap()).tailMap(fromKey, inclusive);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ if (getMap() instanceof ConcurrentMap)
+ return ((ConcurrentMap<K,V>) getMap()).putIfAbsent(key, value);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ if (getMap() instanceof ConcurrentMap)
+ return ((ConcurrentMap<K,V>) getMap()).remove(key, value);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ if (getMap() instanceof ConcurrentMap)
+ return ((ConcurrentMap<K,V>) getMap()).replace(key, oldValue, newValue);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ @Override
+ public V replace(K key, V value) {
+ if (getMap() instanceof ConcurrentMap)
+ return ((ConcurrentMap<K,V>) getMap()).replace(key, value);
+ throw new UnsupportedOperationException("Unsupported Interface Method.");
+ }
+
+ /**
+ * @return the map
+ */
+ public Map<K,V> getMap() {
+ return map;
+ }
+
+ /**
+ * @return the built
+ */
+ public boolean isBuilt() {
+ return built;
+ }
+
+ @Override
+ Map<K, V> build() throws InstantiationException, IllegalAccessException, ObjectStreamException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ /**
+ * @serialData
+ * @return the type
+ */
+ abstract Ref getType();
+
+ /**
+ * @serialData
+ * @return the collection
+ */
+ abstract Map<Referrer<K>,Referrer<V>> getRefMap();
+
+ /**
+ * @serialData
+ * @return the class
+ */
+ abstract Class getClazz();
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/Ref.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,193 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * Ref enum represents types of references available for use in java
+ * collection framework implementations.
+ * </p><p>
+ * Only use STRONG, WEAK_IDENTITY and TIME based references as keys
+ * in Maps, use of other reference types should be discouraged
+ * due to unpredictable results, when for example, an equal WEAK
+ * Reference Key disappears due to garbage collection, or SOFT reference keys
+ * aren't garbage collected as expected.
+ * </p><p>
+ * Map implementations delete their key -> value mapping when either the
+ * key or value References become unreachable. ConcurrentMap's will retry until
+ * putIfAbsent is successful when an existing Referrer key is cleared.
+ * </p><p>
+ * Only use STRONG and TIME based references in Queue's, other types break
+ * Queue's contract, null has special meaning, a cleared reference returns
+ * null.
+ * </p><p>
+ * Object.toString() is overridden in reference implementations to call
+ * toString() on the referent, if still reachable, otherwise the reference
+ * calls the superclass toString() method, where the superclass is a java
+ * Reference subclass. Consideration is being given to returning a null string
+ * "" instead, if you feel strongly about this, please contact the author.
+ * </p><p>
+ * Phantom references are not used, they are designed to replace
+ * {@link Object#finalize() } and remain unreachable, but not garbage collected until
+ * the {@link PhantomReference} also becomes unreachable, get() always returns
+ * null.
+ * </p><p>
+ * TIME and SOFT and SOFT_IDENTITY references update their access timestamp,
+ * when Referrer.get(), Referrer.equals() or Referrer.toString() is called.
+ * SOFT and SOFT_IDENTITY do so lazily and are not guaranteed to
+ * succeed in updating the access time. SOFT references also update their
+ * access timestamp when either Referrer.hashCode() or Comparable.compareTo()
+ * is called.
+ * </p><p>
+ * For sets and map keys that require traversal using a Comparator or
+ * Comparable, access times will be updated during each traversal. In
+ * these circumstances, SOFT and SOFT_IDENTITY references are typically not enqueued and
+ * do not behave as expected.
+ * </p><p>
+ * SOFT references are only suited for use in lists or as values in maps and not
+ * suitable for keys in hash or tree maps.
+ * </p><p>
+ * TIME references only update their access timestamp during traversal when
+ * a Comparator or Comparable returns zero (a successful match), or when equals is true.
+ * TIME references are suitable for use as keys in tree and hash based maps
+ * as well as sets and tasks in Queues. In fact TIME is the only referrer
+ * suitable for use in Queue's and their subtypes. Tasks in a Queue, if
+ * timed out, are first cancelled, then removed, they are not cleared as
+ * doing so could cause a null poll() return when a queue is not empty,
+ * which would violate the contract for Queue.
+ * </p><p>
+ * SOFT_IDENTITY references are suitable for use as
+ * keys in hash tables, hash maps and hash sets, since hashCode() does not update
+ * the access timestamp, while equals() does. SOFT_IDENTITY references are not
+ * recommended for use in tree maps, tree sets or queues.
+ * </p>
+ * @see Reference
+ * @see WeakReference
+ * @see SoftReference
+ * @see PhantomReference
+ * @see Comparable
+ * @author Peter Firmstone.
+ */
+public enum Ref {
+ /**
+ * <P>
+ * TIME References implement equals based on equality of the referent
+ * objects. Time references are STRONG references that are removed
+ * after a period of no access, even if they are strongly
+ * referenced outside the cache, they are removed. TIME references don't
+ * rely on Garbage Collection algorithms.
+ * </P><P>
+ * TIME References support cancellation of tasks implementing Future, when
+ * the reference times out, if it contains a Future, it is cancelled.
+ * </P><P>
+ * A call to equals(), get() or toString(), will cause the timestamp on
+ * the reference to be updated, whereas hashCode() will not, in addition,
+ * Comparators and referents that implement Comparable, only update the
+ * timestamp if they return 0. This allows referents to be inspected
+ * without update when they don't match.
+ * </P><P>
+ * TIME References require synchronisation for iteration, so during
+ * cleaning periods, a synchronised Collection or Map will be locked.
+ * A lock is still obtained for iterating over Concurrent Maps and
+ * Collections, however this does not normally synchronise access between threads,
+ * only other cleaning task threads.
+ * </P>
+ * @see Future
+ *
+ */
+ TIME,
+ /**
+ * <P>
+ * SOFT References implement equals based on equality of the referent
+ * objects, while the referent is still reachable. The hashCode
+ * implementation is based on the referent implementation of hashCode,
+ * while the referent is reachable.
+ * </P><P>
+ * SOFT References implement Comparable allowing the referent Objects
+ * to be compared if they implement Comparable. If the referent Object
+ * doesn't implement Comparable, the hashCode's of the Reference is
+ * compared instead. If the referent Objects don't implement Comparable,
+ * then they shouldn't really be used in sorted collections.
+ * </P><P>
+ * Garbage collection must be the same as SoftReference.
+ * </P>
+ * @see SoftReference
+ * @see WeakReference
+ * @see Comparable
+ */
+ SOFT,
+ /**
+ * <P>
+ * SOFT_IDENTY References implement equals based on identity == of the
+ * referent objects.
+ * </P><P>
+ * Garbage collection must be the same as SoftReference.
+ * </P>
+ * @see SoftReference
+ */
+ SOFT_IDENTITY,
+ /**
+ * <P>
+ * WEAK References implement equals based on equality of the referent
+ * objects, while the referent is still reachable. The hashCode
+ * implementation is based on the referent implementation of hashCode,
+ * while the referent is reachable.
+ * </P><P>
+ * WEAK References implement comparable allowing the referent Objects
+ * to be compared if they implement Comparable. If the referent Object
+ * doesn't implement Comparable, the hashCode's of the Reference is
+ * compared instead. If the referent Object's don't implement Comparable,
+ * then they shouldn't really be used in sorted collections.
+ * </P><P>
+ * Garbage collection must be the same as WeakReference.
+ * </P>
+ * @see WeakReference
+ * @see Comparable
+ */
+ WEAK,
+ /**
+ * <P>
+ * WEAK_IDENTY References implement equals based on identity == of the
+ * referent objects.
+ * </P><P>
+ * Garbage collection must be the same as WeakReference.
+ * </P>
+ * @see WeakReference
+ */
+ WEAK_IDENTITY,
+ /**
+ * <P>
+ * STRONG References implement equals and hashCode() based on the
+ * equality of the underlying Object.
+ * </P><P>
+ * STRONG References implement Comparable allowing the referent Objects
+ * to be compared if they implement Comparable. If the referent Object
+ * doesn't implement Comparable, the hashCode's of the Reference is
+ * compared instead. If the referent Object's don't implement Comparable,
+ * then they shouldn't really be used in sorted collections.
+ * </P><P>
+ * Garbage collection doesn't occur unless the Reference is cleared.
+ * </P>
+ * @see Comparable
+ */
+ STRONG
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefQueue.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2012 peter.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.concurrent;
+
+/**
+ * This interface defines the only ReferenceQueue method used,
+ * to not depend directly on ReferenceQueue, allowing the use of other Queue
+ * implementations.
+ *
+ * @param <T>
+ * @author peter
+ */
+interface RefQueue<T> {
+ public Object poll();
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/RefReferenceQueue.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2012 peter.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.concurrent;
+
+import java.lang.ref.ReferenceQueue;
+
+/**
+ * A ReferenceQueue that implements RefQueue
+ *
+ * @author Peter Firmstone
+ */
+class RefReferenceQueue<T> extends ReferenceQueue<T> implements RefQueue<T>{}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingDeque.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,156 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @author Peter Firmstone.
+ */
+class ReferenceBlockingDeque<T> extends ReferenceDeque<T> implements BlockingDeque<T>{
+ private static final long serialVersionUID = 1L;
+
+ private final BlockingDeque<Referrer<T>> deque;
+
+ ReferenceBlockingDeque(BlockingDeque<Referrer<T>> deque, Ref type, boolean gcThreads, long gcCycle){
+ super(deque, type, gcThreads, gcCycle);
+ this.deque = deque;
+ }
+
+ private void readObject(ObjectInputStream stream)
+ throws InvalidObjectException{
+ throw new InvalidObjectException("Builder required");
+ }
+
+ public void putFirst(T e) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ deque.putFirst(r);
+ }
+
+
+ public void putLast(T e) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ deque.putLast(r);
+ }
+
+
+ public boolean offerFirst(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ return deque.offerFirst(r, timeout, unit);
+ }
+
+
+ public boolean offerLast(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ return deque.offerLast(r, timeout, unit);
+ }
+
+
+ public T takeFirst() throws InterruptedException {
+ processQueue();
+ Referrer<T> t = deque.takeFirst();
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+
+ public T takeLast() throws InterruptedException {
+ processQueue();
+ Referrer<T> t = deque.takeLast();
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+
+ public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> t = deque.pollFirst(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+
+ public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> t = deque.pollLast(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+
+ public void put(T e) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ deque.put(r);
+ }
+
+
+ public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ return deque.offer(r,timeout, unit);
+ }
+
+
+ public T take() throws InterruptedException {
+ processQueue();
+ Referrer<T> t = deque.take();
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> t = deque.poll(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+
+ public int remainingCapacity() {
+ return deque.remainingCapacity();
+ }
+
+
+ public int drainTo(Collection<? super T> c) {
+ processQueue();
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Referrer<T>> dr = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+ return deque.drainTo(dr);
+ }
+
+
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ processQueue();
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Referrer<T>> drain = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+ return deque.drainTo(drain, maxElements);
+ }
+ }
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceBlockingQueue.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,92 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * @param <T>
+ * @author Peter Firmstone.
+ */
+class ReferenceBlockingQueue<T> extends ReferencedQueue<T> implements BlockingQueue<T> {
+ private static final long serialVersionUID = 1L;
+ private final BlockingQueue<Referrer<T>> queue;
+
+ ReferenceBlockingQueue(BlockingQueue<Referrer<T>> queue, Ref type, boolean gcThreads, long gcCycle){
+ super(queue, type, gcThreads, gcCycle);
+ this.queue = queue;
+ }
+
+ private void readObject(ObjectInputStream stream)
+ throws InvalidObjectException{
+ throw new InvalidObjectException("Builder required");
+ }
+
+ public void put(T e) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ queue.put(r);
+ }
+
+ public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> r = wrapObj(e, true, false);
+ return queue.offer(r, timeout, unit);
+ }
+
+ public T take() throws InterruptedException {
+ processQueue();
+ Referrer<T> t = queue.take();
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ processQueue();
+ Referrer<T> t = queue.poll(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
+ }
+
+ public int remainingCapacity() {
+ processQueue();
+ return queue.remainingCapacity();
+ }
+
+ public int drainTo(Collection<? super T> c) {
+ processQueue();
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Referrer<T>> dr = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+ return queue.drainTo(dr);
+ }
+
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ processQueue();
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Referrer<T>> drain = new CollectionDecorator<T>( (Collection<T>) c, getRQF(), false, true);
+ return queue.drainTo(drain, maxElements);
+ }
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollection.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,207 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+import java.io.WriteAbortedException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Collection of Reference Objects, the developer may chose any Collection
+ * implementation to store the References, which is passed in a runtime.
+ *
+ * The underlying Collection implementation governs the specific behaviour of this
+ * Collection.
+ *
+ * Synchronisation must be implemented by the underlying Collection and cannot
+ * be performed externally to this class. The underlying Collection must
+ * also be mutable. Objects will be removed automatically from the underlying
+ * Collection when they are eligible for garbage collection.
+ *
+ * Weak, Weak Identity, Soft, Soft Identity or Strong references may be used.
+ * This Collection may be used as an Object pool cache or any other purpose
+ * that requires unique memory handling.
+ *
+ * For concurrent threads, it is recommended to encapsulate the underlying
+ * collection in a multi read, single write collection for scalability.
+ *
+ * @see Ref
+ * @see ConcurrentCollections#multiReadCollection(java.util.Collection)
+ * @author Peter Firmstone.
+ */
+class ReferenceCollection<T> extends AbstractCollection<T>
+ implements Collection<T>, Serializable {
+ private static final long serialVersionUID = 1L;
+ private final Collection<Referrer<T>> col;
+ private final ReferenceQueuingFactory<T, Referrer<T>> rqf;
+ private final Ref type;
+
+ @SuppressWarnings("unchecked")
+ ReferenceCollection(Collection<Referrer<T>> col, Ref type, boolean gcThread, long gcCycle){
+ RefQueue<T> que = null;
+ if (type == Ref.TIME) que = new TimedRefQueue();
+ else if (type != Ref.STRONG) que = new RefReferenceQueue<T>();
+ this.col = col;
+ ReferenceProcessor<T> rp = new ReferenceProcessor<T>(col, type, que, gcThread, col);
+ this.type = type;
+ rqf = rp;
+ rp.start(gcCycle);
+ }
+
+ ReferenceCollection(Collection<Referrer<T>> col,
+ ReferenceQueuingFactory<T, Referrer<T>> rqf, Ref type){
+ this.col = col;
+ this.rqf = rqf;
+ this.type = type;
+ }
+
+ void processQueue(){
+ //rqf.processQueue();
+ }
+
+ ReferenceQueuingFactory<T, Referrer<T>> getRQF(){
+ return rqf;
+ }
+
+ Ref getRef(){
+ return type;
+ }
+
+ Referrer<T> wrapObj(T t, boolean enqueue, boolean temporary){
+ return rqf.referenced(t, enqueue, temporary);
+ }
+
+ public int size() {
+ processQueue();
+ return col.size();
+ }
+
+ public boolean isEmpty() {
+ processQueue();
+ return col.isEmpty();
+ }
+
+ public boolean contains(Object o) {
+ processQueue();
+ return col.contains(wrapObj((T) o, false, true));
+ }
+
+ /**
+ * This Iterator may return null values if garbage collection
+ * runs during iteration.
+ *
+ * Always check for null values.
+ *
+ * @return T - possibly null.
+ */
+ public Iterator<T> iterator() {
+ processQueue();
+ return new ReferenceIterator<T>(col.iterator());
+ }
+
+ public boolean add(T e) {
+ processQueue();
+ return col.add(wrapObj(e, true, false));
+ }
+
+ public boolean remove(Object o) {
+ processQueue();
+ return col.remove(wrapObj((T) o, false, true));
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean containsAll(Collection<?> c) {
+ processQueue();
+ return col.containsAll(new CollectionDecorator<T>((Collection<T>) c, getRQF(), false, true));
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean addAll(Collection<? extends T> c) {
+ processQueue();
+ return col.addAll(new CollectionDecorator<T>((Collection<T>) c, getRQF(), true, false));
+ }
+
+ public void clear() {
+ col.clear();
+ }
+
+ /*
+ * The next three methods are suitable implementations for subclasses also.
+ */
+ public String toString(){
+ return col.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ if ( col instanceof List || col instanceof Set ){
+ return col.hashCode();
+ }
+ return System.identityHashCode(this);
+ }
+
+ /**
+ * Because equals and hashCode are not defined for collections, we
+ * cannot guarantee consistent behaviour by implementing equals and
+ * hashCode. A collection could be a list, set, queue or deque.
+ * So a List != Queue and a Set != list. therefore equals for collections is
+ * not defined.
+ *
+ * However since two collections may both also be Lists, while abstracted
+ * from the client two lists may still be equal.
+ * @see Collection#equals(java.lang.Object)
+ */
+
+ @Override
+ public boolean equals(Object o){
+ if ( o == this ) return true;
+ if ( col instanceof List || col instanceof Set ){
+ return col.equals(o);
+ }
+ return false;
+ }
+
+ final Object writeReplace() throws ObjectStreamException {
+ try {
+ // returns a Builder instead of this class.
+ return SerializationOfReferenceCollection.create(getClass(), col, type );
+ } catch (InstantiationException ex) {
+ throw new WriteAbortedException("Unable to create serialization proxy", ex);
+ } catch (IllegalAccessException ex) {
+ throw new WriteAbortedException("Unable to create serialization proxy", ex);
+ }
+ }
+
+ private void readObject(ObjectInputStream stream)
+ throws InvalidObjectException{
+ throw new InvalidObjectException("Builder required");
+ }
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionRefreshAfterSerialization.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,59 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.InvalidObjectException;
+import java.io.ObjectStreamException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ *
+ * @author peter
+ */
+abstract class ReferenceCollectionRefreshAfterSerialization<T>
+ extends ReadResolveFixCollectionCircularReferences<T> {
+
+ ReferenceCollectionRefreshAfterSerialization() {
+ }
+
+ @Override
+ final Collection<T> build() throws InstantiationException, IllegalAccessException, ObjectStreamException {
+ Collection<T> result = super.build();
+ /* What if the underlying collection is immutable?
+ * The ReferenceQueuingFactory is unknown until the ReferenceCollection
+ * has been built.
+ */
+ if (result instanceof ReferenceCollection) {
+ ReferenceCollection<T> refCol = (ReferenceCollection<T>) result;
+ ReferenceQueuingFactory<T, Referrer<T>> rqf = refCol.getRQF();
+ Iterator<Referrer<T>> colIt = getCollection().iterator();
+ while (colIt.hasNext()) {
+ Referrer<T> ref = colIt.next();
+ if (ref == null) {
+ continue;
+ }
+ if (ref instanceof AbstractReferrerDecorator) {
+ ((AbstractReferrerDecorator<T>) ref).refresh(rqf);
+ } else {
+ throw new InvalidObjectException("Referrer's must be an AbstractReferrerWraper for ReferenceCollection");
+ }
+ }
+ }
+ return result;
+ }
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceCollectionSerialData.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,88 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ *This class is the serial form of ReferenceCollection and all it's subclasses.
+ *
+ * While the serial form of this class will remain compatible with itself,
+ * ReferenceCollection may replace this implementation with another
+ * at some point in the future.
+ *
+ * This class will still be able to de-serialise into a ReferenceCollection.
+ *
+ * @author peter
+ */
+ class ReferenceCollectionSerialData<T>
+ extends ReferenceCollectionRefreshAfterSerialization<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ /** @serialField */
+ private Ref type;
+ /** @serialField */
+ private Collection<Referrer<T>> collection;
+ /** @serialField */
+ private Class referenceCollectionClass;
+
+ @SuppressWarnings("unchecked")
+ ReferenceCollectionSerialData( Class clazz,
+ Collection<Referrer<T>> underlyingCollection, Ref type)
+ throws InstantiationException, IllegalAccessException{
+ // Create a new instance of the underlying collection and
+ // add all objects.
+ if ( clazz == null || underlyingCollection == null || type == null){
+ throw new NullPointerException("null parameters prohibited");
+ }
+ this.collection = underlyingCollection;
+ this.type = type;
+ this.referenceCollectionClass = clazz;
+ }
+
+ @Override
+ public Ref getType() {
+ return type;
+ }
+
+ @Override
+ public Collection<Referrer<T>> getCollection() {
+ return collection;
+ }
+
+ @Override
+ public Class getClazz() {
+ return referenceCollectionClass;
+ }
+
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ if ( referenceCollectionClass == null || collection == null || type == null){
+ throw new InvalidObjectException("null fields found after deserialization");
+ }
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceComparator.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,52 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.io.IOException;
+import java.io.InvalidObjectException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ *
+ * @param <T>
+ * @author Peter Firmstone.
+ */
+class ReferenceComparator<T> extends AbstractReferenceComparator<T> implements Serializable{
+ private static final long serialVersionUID = 1L;
+ private Comparator<? super T> comparator;
+ ReferenceComparator(Comparator<? super T> comparator){
+ if ( comparator == null ) throw new IllegalArgumentException("Null value prohibited");
+ this.comparator = comparator;
+ }
+
+ Comparator<? super T> get(){
+ return comparator;
+ }
+
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ if ( get() == null ) throw new InvalidObjectException("Null value prohibited");
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentMap.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,107 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+
+import java.lang.ref.Reference;
+import java.util.Random;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A referenced hash map, that encapsulates and utilises any ConcurrentMap
+ * implementation passed in at construction.
+ *
+ * Based on any ConcurrentMap implementation, it doesn't accept null keys or values.
+ *
+ * It is recommended although not mandatory to use identity based References for keys,
+ * unexpected results occur when relying on equal keys, if one key is no longer
+ * strongly reachable and has been garbage collected and removed from the
+ * Map.
+ *
+ *
+ *
+ * If either a key or value, is no longer strongly reachable, their mapping
+ * will be queued for removal and garbage collection, in compliance with
+ * the Reference implementation selected.
+ *
+ * @param <K>
+ * @param <V>
+ * @see Ref
+ * @author Peter Firmstone.
+ *
+ * @since 2.3
+ */
+class ReferenceConcurrentMap<K, V> extends ReferenceMap<K, V> implements ConcurrentMap<K, V> {
+
+ // ConcurrentMap must be protected from null values? It changes it's behaviour, is that a problem?
+ private final ConcurrentMap<Referrer<K>, Referrer<V>> map;
+
+ ReferenceConcurrentMap(ConcurrentMap<Referrer<K>,Referrer<V>> map, Ref key, Ref val, boolean gcThreads, long gcKeyCycle, long gcValCycle){
+ super (map, key, val, gcThreads, gcKeyCycle, gcValCycle);
+ this.map = map;
+ }
+
+ ReferenceConcurrentMap(ConcurrentMap<Referrer<K>, Referrer<V>> map,
+ ReferenceQueuingFactory<K, Referrer<K>> krqf, ReferenceQueuingFactory<V, Referrer<V>> vrqf, Ref key, Ref val){
+ super(map, krqf, vrqf, key, val);
+ this.map = map;
+ }
+
+ public V putIfAbsent(K key, V value) {
+ processQueue(); //may be a slight delay before atomic putIfAbsent
+ Referrer<K> k = wrapKey(key, true, false);
+ Referrer<V> v = wrapVal(value, true, false);
+ Referrer<V> val = map.putIfAbsent(k, v);
+ while ( val != null ) {
+ V existed = val.get();
+ // We hold a strong reference to value, so
+ if ( existed == null ){
+ // stale reference must be replaced, it has been garbage collect but hasn't
+ // been removed, we must treat it like the entry doesn't exist.
+ if ( map.replace(k, val, v)){
+ // replace successful
+ return null; // Because officially there was no record.
+ } else {
+ // Another thread may have replaced it.
+ val = map.putIfAbsent(k, v);
+ }
+ } else {
+ return existed;
+ }
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean remove(Object key, Object value) {
+ processQueue();
+ return map.remove(wrapKey((K) key, false, true), wrapVal((V) value, false, true));
+ }
+
+ public boolean replace(K key, V oldValue, V newValue) {
+ processQueue();
+ return map.replace(wrapKey(key, false, true), wrapVal(oldValue, false, true), wrapVal(newValue, true, false));
+ }
+
+ public V replace(K key, V value) {
+ processQueue();
+ Referrer<V> val = map.replace(wrapKey(key, false, true), wrapVal(value, true, false));
+ if ( val != null ) return val.get();
+ return null;
+ }
+}
Added: river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java?rev=1702174&view=auto
==============================================================================
--- river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java (added)
+++ river/jtsk/skunk/qa-refactor-namespace/trunk/src/org/apache/river/concurrent/ReferenceConcurrentNavigableMap.java Thu Sep 10 06:59:28 2015
@@ -0,0 +1,242 @@
+/* Copyright (c) 2010-2012 Zeus Project Services Pty Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.concurrent;
+
+import java.util.Comparator;
+import java.util.Map.Entry;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ *
+ * @param <K>
+ * @param <V>
+ * @author Peter Firmstone.
+ */
+class ReferenceConcurrentNavigableMap<K,V>
+extends ReferenceConcurrentMap<K,V> implements ConcurrentNavigableMap<K,V>{
+ private final ConcurrentNavigableMap<Referrer<K>,Referrer<V>> map;
+
+ ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Referrer<K>, Referrer<V>> map, Ref keyRef, Ref valRef, boolean gcThreads, long gcKeyCycle, long gcValCycle){
+ super(map, keyRef, valRef, gcThreads, gcKeyCycle, gcValCycle);
+ this.map = map;
+ }
+
+ ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Referrer<K>, Referrer<V>> map,
+ ReferenceQueuingFactory<K, Referrer<K>> krqf, ReferenceQueuingFactory<V, Referrer<V>> vrqf, Ref key, Ref val){
+ super(map, krqf, vrqf, key, val);
+ this.map = map;
+ }
+
+ public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.subMap(
+ wrapKey(fromKey, false, true),
+ fromInclusive,
+ wrapKey(toKey, false, true),
+ toInclusive
+ ),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.headMap(wrapKey(toKey, false, true), inclusive),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.tailMap(wrapKey(fromKey, false, true), inclusive),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.subMap(wrapKey(fromKey, false, true), wrapKey(toKey, false, true)),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public ConcurrentNavigableMap<K, V> headMap(K toKey) {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.headMap(wrapKey(toKey, false, true)),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.tailMap(wrapKey(fromKey, false, true)),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public ConcurrentNavigableMap<K, V> descendingMap() {
+ processQueue();
+ return new ReferenceConcurrentNavigableMap<K,V>(
+ map.descendingMap(),
+ getKeyRQF(),
+ getValRQF(), keyRef(), valRef()
+ );
+ }
+
+ public NavigableSet<K> navigableKeySet() {
+ processQueue();
+ return new ReferenceNavigableSet<K>(map.navigableKeySet(), getKeyRQF(), keyRef());
+ }
+
+ public NavigableSet<K> keySet() {
+ processQueue();
+ return new ReferenceNavigableSet<K>(map.keySet(), getKeyRQF(), keyRef());
+ }
+
+ public NavigableSet<K> descendingKeySet() {
+ processQueue();
+ return new ReferenceNavigableSet<K>(map.descendingKeySet(), getKeyRQF(), keyRef());
+ }
+
+ public Entry<K, V> lowerEntry(K key) {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.lowerEntry(wrapKey(key, false, true)),
+ getValRQF()
+ );
+ }
+
+ public K lowerKey(K key) {
+ processQueue();
+ Referrer<K> k = map.lowerKey(wrapKey(key, false, true));
+ if ( k != null ) return k.get();
+ return null;
+ }
+
+ public Entry<K, V> floorEntry(K key) {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.floorEntry(wrapKey(key, false, true)),
+ getValRQF()
+ );
+ }
+
+ public K floorKey(K key) {
+ processQueue();
+ Referrer<K> k = map.floorKey(wrapKey(key, false, true));
+ if ( k != null ) return k.get();
+ return null;
+ }
+
+ public Entry<K, V> ceilingEntry(K key) {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.ceilingEntry(wrapKey(key, false, true)),
+ getValRQF()
+ );
+ }
+
+ public K ceilingKey(K key) {
+ processQueue();
+ Referrer<K> k = map.ceilingKey(wrapKey(key, false, true));
+ if ( k != null ) return k.get();
+ return null;
+ }
+
+ public Entry<K, V> higherEntry(K key) {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.higherEntry(wrapKey(key, false, true)),
+ getValRQF()
+ );
+ }
+
+ public K higherKey(K key) {
+ processQueue();
+ Referrer<K> k = map.higherKey(wrapKey(key, false, true));
+ if ( k != null ) return k.get();
+ return null;
+ }
+
+ public Entry<K, V> firstEntry() {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.firstEntry(),
+ getValRQF()
+ );
+ }
+
+ public Entry<K, V> lastEntry() {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.lastEntry(),
+ getValRQF()
+ );
+ }
+
+ public Entry<K, V> pollFirstEntry() {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.pollFirstEntry(),
+ getValRQF()
+ );
+ }
+
+ public Entry<K, V> pollLastEntry() {
+ processQueue();
+ return new ReferenceEntryFacade<K,V>(
+ map.pollLastEntry(),
+ getValRQF()
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ public Comparator<? super K> comparator() {
+ processQueue();
+ Comparator<? super Referrer<K>> c = map.comparator();
+ if ( c instanceof ReferenceComparator){
+ return ((ReferenceComparator) c).get();
+ }
+ return null;
+ }
+
+ public K firstKey() {
+ processQueue();
+ Referrer<K> k = map.firstKey();
+ if ( k != null ) return k.get();
+ return null;
+ }
+
+ public K lastKey() {
+ processQueue();
+ Referrer<K> k = map.lastKey();
+ if ( k != null ) return k.get();
+ return null;
+ }
+}