You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2011/10/03 10:24:24 UTC
svn commit: r1178329 [1/2] - in /river/jtsk/skunk/peterConcurrentPolicy:
src/org/apache/river/impl/util/ test/src/org/apache/river/impl/util/
Author: peter_firmstone
Date: Mon Oct 3 08:24:23 2011
New Revision: 1178329
URL: http://svn.apache.org/viewvc?rev=1178329&view=rev
Log:
Reference Collection utilities null reference checks and non blocking concurrency
Added:
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java (with props)
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java (with props)
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java (with props)
Modified:
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/SoftIdentityReferenceKey.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/SoftReferenceKey.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/WeakIdentityReferenceKey.java
river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/WeakReferenceKey.java
river/jtsk/skunk/peterConcurrentPolicy/test/src/org/apache/river/impl/util/ReferenceCollectionTest.java
Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java?rev=1178329&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java Mon Oct 3 08:24:23 2011
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ *
+ * @author peter
+ */
+public class CollectionWrapper<T> extends AbstractCollection<Reference<T>> implements Collection<Reference<T>> {
+ private final Collection<T> col;
+ private final ReferenceQueuingFactory<T, Reference<T>> rqf;
+ private final boolean enque;
+
+ CollectionWrapper(Collection<T> col, ReferenceQueuingFactory<T, Reference<T>> rqf, boolean enque){
+ this.col = col;
+ this.rqf = rqf;
+ this.enque = enque;
+ }
+
+ @Override
+ public Iterator<Reference<T>> iterator() {
+ return new Iter<T>(col.iterator(), rqf);
+ }
+
+ @Override
+ public int size() {
+ return col.size();
+ }
+
+ public boolean add(Reference<T> t) {
+ return col.add( t != null ? t.get() : null );
+ }
+
+ private class Iter<T> implements Iterator<Reference<T>> {
+ Iterator<T> iterator;
+ private final ReferenceQueuingFactory<T, Reference<T>> rqf;
+ Iter(Iterator<T> it, ReferenceQueuingFactory<T, Reference<T>> rqf){
+ iterator = it;
+ this.rqf = rqf;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Reference<T> next() {
+ return rqf.referenced( iterator.next(), enque);
+ }
+
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+
+ }
+
+}
Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/CollectionWrapper.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryConversionFactory.java Mon Oct 3 08:24:23 2011
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.impl.util;
-
-/**
- * An interface for converting Map.Entry to disguise contained References.
- *
- * @author Peter Firmstone
- */
-interface EntryConversionFactory<O, R> {
-
- O asFacade(R u);
-
- R asReference(O w, boolean enque);
-
- void processQueue();
-
-}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryFacadeConverter.java Mon Oct 3 08:24:23 2011
@@ -19,33 +19,38 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.AbstractMap.SimpleEntry;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
/**
*
* @author Peter Firmstone.
*/
-class EntryFacadeConverter<K,V> implements EntryConversionFactory<Entry<K,V>, Entry<Reference<K>, Reference<V>>> {
- ReferenceMap<K,V> map;
+class EntryFacadeConverter<K,V> implements ReferenceQueuingFactory<Entry<K,V>, Entry<Reference<K>, Reference<V>>> {
+ private final ReferenceQueuingFactory<K, Reference<K>> krqf;
+ private final ReferenceQueuingFactory<V, Reference<V>> vrqf;
- EntryFacadeConverter(ReferenceMap<K,V> map) {
- this.map = map;
+
+ EntryFacadeConverter(ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf) {
+ this.krqf = krqf;
+ this.vrqf = vrqf;
}
- public Entry<K,V> asFacade(Entry<Reference<K>, Reference<V>> u) {
- return new ReferenceEntryFacade<K, V>(u, map.getValRef(), map.getValQueue());
+ public Entry<K,V> pseudoReferent(Entry<Reference<K>, Reference<V>> u) {
+ return new ReferenceEntryFacade<K, V>(u, vrqf);
}
- public Entry<Reference<K>, Reference<V>> asReference(Entry<K,V> w, boolean enque) {
+ public Entry<Reference<K>, Reference<V>> referenced(Entry<K,V> w, boolean enque) {
+ // The entry could alread by a Reference based Entry obscured by a facade.
return new SimpleEntry<Reference<K>, Reference<V>>(
- map.wrapKey(w.getKey(), enque), map.wrapVal(w.getValue(), enque)
+ krqf.referenced(w.getKey(), enque), vrqf.referenced(w.getValue(), enque)
);
}
public void processQueue() {
- map.processQueue();
+ krqf.processQueue();
+ vrqf.processQueue();
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntryIteratorFacade.java Mon Oct 3 08:24:23 2011
@@ -26,9 +26,9 @@ import java.util.Iterator;
*/
class EntryIteratorFacade<O, R> implements Iterator<O> {
private Iterator<R> iterator;
- private EntryConversionFactory<O, R> wf;
+ private ReferenceQueuingFactory<O, R> wf;
- EntryIteratorFacade(Iterator<R> iterator, EntryConversionFactory<O, R> wf) {
+ EntryIteratorFacade(Iterator<R> iterator, ReferenceQueuingFactory<O, R> wf) {
this.iterator = iterator;
this.wf = wf;
}
@@ -38,7 +38,7 @@ class EntryIteratorFacade<O, R> implemen
}
public O next() {
- return wf.asFacade(iterator.next());
+ return wf.pseudoReferent(iterator.next());
}
public void remove() {
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/EntrySetFacade.java Mon Oct 3 08:24:23 2011
@@ -36,9 +36,9 @@ import java.util.Set;
*/
class EntrySetFacade<O, R> extends AbstractSet<O> implements Set<O> {
private Set<R> set;
- private EntryConversionFactory<O, R> factory;
+ private ReferenceQueuingFactory<O, R> factory;
- EntrySetFacade(Set<R> set, EntryConversionFactory<O, R> wf) {
+ EntrySetFacade(Set<R> set, ReferenceQueuingFactory<O, R> wf) {
this.set = set;
this.factory = wf;
}
@@ -65,6 +65,6 @@ class EntrySetFacade<O, R> extends Abstr
public boolean add(O e) {
factory.processQueue();
if ( e == null ) return false;
- return set.add(factory.asReference(e, true));
+ return set.add(factory.referenced(e, true));
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingDeque.java Mon Oct 3 08:24:23 2011
@@ -19,9 +19,7 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
@@ -32,12 +30,10 @@ import java.util.concurrent.TimeUnit;
class ReferenceBlockingDeque<T> extends ReferenceDeque<T> implements BlockingDeque<T>{
private final BlockingDeque<Reference<T>> deque;
- private final Ref type;
ReferenceBlockingDeque(BlockingDeque<Reference<T>> deque, Ref type){
super(deque, type);
this.deque = deque;
- this.type = type;
}
@@ -71,25 +67,33 @@ class ReferenceBlockingDeque<T> extends
public T takeFirst() throws InterruptedException {
processQueue();
- return deque.takeFirst().get();
+ Reference<T> t = deque.takeFirst();
+ if ( t != null ) return t.get();
+ return null;
}
public T takeLast() throws InterruptedException {
processQueue();
- return deque.takeLast().get();
+ Reference<T> t = deque.takeLast();
+ if ( t != null ) return t.get();
+ return null;
}
public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
processQueue();
- return deque.pollFirst(timeout, unit).get();
+ Reference<T> t = deque.pollFirst(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
}
public T pollLast(long timeout, TimeUnit unit) throws InterruptedException {
processQueue();
- return deque.pollLast(timeout, unit).get();
+ Reference<T> t = deque.pollLast(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
}
@@ -109,13 +113,17 @@ class ReferenceBlockingDeque<T> extends
public T take() throws InterruptedException {
processQueue();
- return deque.take().get();
+ Reference<T> t = deque.take();
+ if ( t != null ) return t.get();
+ return null;
}
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
processQueue();
- return deque.poll(timeout, unit).get();
+ Reference<T> t = deque.poll(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
}
@@ -126,24 +134,20 @@ class ReferenceBlockingDeque<T> extends
public int drainTo(Collection<? super T> c) {
processQueue();
- Collection<Reference<T>> drain = new ArrayList<Reference<T>>(deque.size());
- int count = deque.drainTo(drain);
- Iterator<Reference<T>> i = drain.iterator();
- while (i.hasNext()){
- c.add(i.next().get());
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Reference<T>> dr = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+ return deque.drainTo(dr);
}
- return count;
- }
public int drainTo(Collection<? super T> c, int maxElements) {
processQueue();
- Collection<Reference<T>> drain = new ArrayList<Reference<T>>(maxElements);
- int count = deque.drainTo(drain, maxElements);
- Iterator<Reference<T>> i = drain.iterator();
- while (i.hasNext()){
- c.add(i.next().get());
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Reference<T>> drain = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+ return deque.drainTo(drain, maxElements);
}
- return count;
}
-}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceBlockingQueue.java Mon Oct 3 08:24:23 2011
@@ -19,9 +19,7 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -32,12 +30,10 @@ import java.util.concurrent.TimeUnit;
*/
class ReferenceBlockingQueue<T> extends ReferencedQueue<T> implements BlockingQueue<T> {
private final BlockingQueue<Reference<T>> queue;
- private final Ref type;
ReferenceBlockingQueue(BlockingQueue<Reference<T>> queue, Ref type){
super(queue, type);
this.queue = queue;
- this.type = type;
}
public void put(T e) throws InterruptedException {
@@ -54,12 +50,16 @@ class ReferenceBlockingQueue<T> extends
public T take() throws InterruptedException {
processQueue();
- return queue.take().get();
+ Reference<T> t = queue.take();
+ if ( t != null ) return t.get();
+ return null;
}
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
processQueue();
- return queue.poll(timeout, unit).get();
+ Reference<T> t = queue.poll(timeout, unit);
+ if ( t != null ) return t.get();
+ return null;
}
public int remainingCapacity() {
@@ -69,24 +69,20 @@ class ReferenceBlockingQueue<T> extends
public int drainTo(Collection<? super T> c) {
processQueue();
- Collection<Reference<T>> drain = new ArrayList<Reference<T>>(queue.size());
- int count = queue.drainTo(drain);
- Iterator<Reference<T>> i = drain.iterator();
- while (i.hasNext()){
- c.add(i.next().get());
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Reference<T>> dr = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+ return queue.drainTo(dr);
}
- return count;
- }
public int drainTo(Collection<? super T> c, int maxElements) {
processQueue();
- Collection<Reference<T>> drain = new ArrayList<Reference<T>>(maxElements);
- int count = queue.drainTo(drain, maxElements);
- Iterator<Reference<T>> i = drain.iterator();
- while (i.hasNext()){
- c.add(i.next().get());
+ if (c == null) throw new NullPointerException();
+ if (c == this) throw new IllegalArgumentException();
+ @SuppressWarnings("unchecked")
+ Collection<Reference<T>> drain = new CollectionWrapper<T>( (Collection<T>) c, getRQF(), true);
+ return queue.drainTo(drain, maxElements);
}
- return count;
- }
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceCollection.java Mon Oct 3 08:24:23 2011
@@ -20,7 +20,7 @@ package org.apache.river.impl.util;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
-import java.util.ArrayList;
+import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -49,51 +49,32 @@ import java.util.Set;
* @see ConcurrentCollections#multiReadCollection(java.util.Collection)
* @author Peter Firmstone.
*/
-class ReferenceCollection<T> implements Collection<T>{
- private Collection<Reference<T>> col;
- private Ref type;
- private ReferenceQueue<T> queue;
+class ReferenceCollection<T> extends AbstractCollection<T> implements Collection<T>{
+ private final Collection<Reference<T>> col;
+ private final ReferenceQueuingFactory<T, Reference<T>> rqf;
ReferenceCollection(Collection<Reference<T>> col, Ref type){
- this.col = col;
- this.type = type;
- queue = new ReferenceQueue<T>();
+ this(col, new ReferenceProcessor<T>(col, type, type == Ref.STRONG ? null : new ReferenceQueue<T>()));
}
- ReferenceCollection(Collection<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
+ ReferenceCollection(Collection<Reference<T>> col,
+ ReferenceQueuingFactory<T, Reference<T>> rqf){
this.col = col;
- this.type = type;
- this.queue = queue;
+ this.rqf = rqf;
}
void processQueue(){
- Object t = null;
- while ( (t = queue.poll()) != null){
- col.remove(t);
+ rqf.processQueue();
}
- }
-
- ReferenceQueue<T> getQueue(){
- return queue;
- }
- Ref getType(){
- return type;
+ ReferenceQueuingFactory<T, Reference<T>> getRQF(){
+ return rqf;
}
Reference<T> wrapObj(T t, boolean enqueue){
- return ReferenceFactory.create(t, enqueue == true ? queue: null , type);
+ return rqf.referenced(t, enqueue);
}
- Collection<Reference<T>> wrapColl( Collection<? extends T> c, boolean enqueue){
- Collection<Reference<T>> refc = new ArrayList<Reference<T>>(c.size());
- Iterator<? extends T> i = c.iterator();
- while (i.hasNext()){
- refc.add(wrapObj(i.next(), false));
- }
- return refc;
- }
-
public int size() {
processQueue();
return col.size();
@@ -122,28 +103,6 @@ class ReferenceCollection<T> implements
return new ReferenceIterator<T>(col.iterator());
}
- public Object[] toArray() {
- processQueue();
- int l = col.size();
- Object[] array = col.toArray(new Object[l]);
- for (int i = 0; i < l; i++){
- array[i] = ((Reference)array[i]).get();
- }
- return array;
- }
-
- @SuppressWarnings("unchecked")
- public <T> T[] toArray(T[] a) {
- processQueue();
- int l = col.size();
- Reference<T>[] result = col.toArray( new Reference[col.size()]);
- if ( a.length < result.length) a = (T[]) new Object[result.length];
- for (int i = 0; i < l; i++){
- a[i] = (T) result[i].get();
- }
- return a;
- }
-
public boolean add(T e) {
processQueue();
return col.add(wrapObj(e, true));
@@ -154,24 +113,18 @@ class ReferenceCollection<T> implements
return col.remove(wrapObj((T) o, false));
}
+
+ @SuppressWarnings("unchecked")
public boolean containsAll(Collection<?> c) {
processQueue();
- return col.containsAll(wrapColl((Collection<T>) c, false));
+ return col.containsAll(new CollectionWrapper<T>((Collection<T>) c, getRQF(), false));
}
+
+ @SuppressWarnings("unchecked")
public boolean addAll(Collection<? extends T> c) {
processQueue();
- return col.addAll(wrapColl((Collection<T>) c, true));
- }
-
- public boolean removeAll(Collection<?> c) {
- processQueue();
- return col.removeAll(wrapColl((Collection<T>) c, false));
- }
-
- public boolean retainAll(Collection<?> c) {
- processQueue();
- return col.retainAll(wrapColl((Collection<T>) c, false));
+ return col.addAll(new CollectionWrapper<T>((Collection<T>) c, getRQF(), true));
}
public void clear() {
@@ -190,10 +143,7 @@ class ReferenceCollection<T> implements
if ( col instanceof List || col instanceof Set ){
return col.hashCode();
}
- int hash = 7;
- hash = 67 * hash + (this.col != null ? this.col.hashCode() : 0);
- hash = 67 * hash + (this.type != null ? this.type.hashCode() : 0);
- return hash;
+ return System.identityHashCode(this);
}
/**
@@ -216,25 +166,4 @@ class ReferenceCollection<T> implements
}
return false;
}
-
-
-// @Override
-// public int hashCode() {
-// int hash = 7;
-// hash = 73 * hash + (this.col != null ? this.col.hashCode() : 0);
-// hash = 73 * hash + (this.type != null ? this.type.hashCode() : 0);
-// hash = 73 * hash + (this.getClass().hashCode());
-// return hash;
-// }
-//
-// @Override
-// public boolean equals(Object o){
-// if ( o == this ) return true;
-// if ( hashCode() != o.hashCode()) return false;
-// if (!( o instanceof Collection)) return false;
-// ReferenceCollection that = (ReferenceCollection) o;
-// if ( type.equals(that.type) && col.equals(that.col)) return true;
-// return false;
-// }
-
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceComparator.java Mon Oct 3 08:24:23 2011
@@ -29,11 +29,12 @@ import java.util.Comparator;
class ReferenceComparator<T> implements Comparator<Reference<T>> {
private final Comparator<? super T> comparator;
ReferenceComparator(Comparator<? super T> comparator){
+ if ( comparator == null ) throw new IllegalArgumentException("Null value prohibited");
this.comparator = comparator;
}
public int compare(Reference<T> o1, Reference<T> o2) {
- return comparator.compare(o1.get(), o2.get());
+ return comparator.compare( o1 != null ? o1.get() : null, o2 != null ? o2.get() : null);
}
@Override
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentMap.java Mon Oct 3 08:24:23 2011
@@ -20,7 +20,6 @@ package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.concurrent.ConcurrentMap;
/**
@@ -49,7 +48,6 @@ import java.util.concurrent.ConcurrentMa
*/
class ReferenceConcurrentMap<K, V> extends ReferenceMap<K, V> implements ConcurrentMap<K, V> {
-
// ConcurrentMap must be protected from null values? It changes it's behaviour, is that a problem?
private final ConcurrentMap<Reference<K>, Reference<V>> map;
@@ -58,50 +56,52 @@ class ReferenceConcurrentMap<K, V> exten
this.map = map;
}
- ReferenceConcurrentMap(ConcurrentMap<Reference<K>,Reference<V>> map,
- Ref key, Ref val, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
- super (map, key, val, keyQueue, valQueue);
+ ReferenceConcurrentMap(ConcurrentMap<Reference<K>, Reference<V>> map,
+ ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf){
+ super(map, krqf, vrqf);
this.map = map;
}
public V putIfAbsent(K key, V value) {
processQueue(); //may be a slight delay before atomic putIfAbsent
- if (key == null || value == null) throw new NullPointerException("null key or value not allowed");
Reference<K> k = wrapKey(key, true);
Reference<V> v = wrapVal(value, true);
Reference<V> val = map.putIfAbsent(k, v);
- if ( val != null ) return val.get();
+ while ( val != null ) {
+ V existed = val.get();
+ // We hold a strong reference to value, so
+ if ( existed == null ){
+ // stale reference must be replaced, it has been garbage collect but hasn't
+ // been removed, we must treat it like the entry doesn't exist.
+ if ( map.replace(k, val, v)){
+ // replace successful
+ return null; // Because officially there was no record.
+ } else {
+ // Another thread may have replaced it.
+ val = map.putIfAbsent(k, v);
+ }
+ } else {
+ return existed;
+ }
+ }
return null;
}
+ @SuppressWarnings("unchecked")
public boolean remove(Object key, Object value) {
processQueue();
- if (key == null || value == null) return false;
- @SuppressWarnings("unchecked")
- Reference<K> k = wrapKey( (K) key, false);
- @SuppressWarnings("unchecked")
- Reference<V> v = wrapVal( (V) value, false);
- return map.remove(k, v);
+ return map.remove(wrapKey((K) key, false), wrapVal((V) value, false));
}
public boolean replace(K key, V oldValue, V newValue) {
processQueue();
- if (key == null || oldValue == null || newValue == null) return false;
- Reference<K> k = wrapKey(key, true);
- Reference<V> vOld = wrapVal(oldValue, false);
- Reference<V> vNew = wrapVal(newValue, true);
- return map.replace(k, vOld, vNew);
+ return map.replace(wrapKey(key, true), wrapVal(oldValue, false), wrapVal(newValue, false));
}
public V replace(K key, V value) {
processQueue();
- if (key == null || value == null) throw new NullPointerException("null key or value not allowed");
- Reference<K> k = wrapKey(key, true);
- Reference<V> v = wrapVal(value, true);
- Reference<V> val = map.replace(k, v);
+ Reference<V> val = map.replace(wrapKey(key, true), wrapVal(value, true));
if ( val != null ) return val.get();
return null;
}
-
-
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceConcurrentNavigableMap.java Mon Oct 3 08:24:23 2011
@@ -19,7 +19,6 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.Comparator;
import java.util.Map.Entry;
import java.util.NavigableSet;
@@ -33,24 +32,19 @@ import java.util.concurrent.ConcurrentNa
*/
class ReferenceConcurrentNavigableMap<K,V>
extends ReferenceConcurrentMap<K,V> implements ConcurrentNavigableMap<K,V>{
- private ConcurrentNavigableMap<Reference<K>,Reference<V>> map;
- private Ref keyRef;
- private Ref valRef;
+ private final ConcurrentNavigableMap<Reference<K>,Reference<V>> map;
ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
super(map, keyRef, valRef);
this.map = map;
- this.keyRef = keyRef;
- this.valRef = valRef;
}
ReferenceConcurrentNavigableMap(ConcurrentNavigableMap<Reference<K>, Reference<V>> map,
- Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
- super(map, keyRef, valRef, keyQueue, valQueue);
+ ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf){
+ super(map, krqf, vrqf);
this.map = map;
- this.keyRef = keyRef;
- this.valRef = valRef;
}
+
public ConcurrentNavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
@@ -60,208 +54,172 @@ extends ReferenceConcurrentMap<K,V> impl
wrapKey(toKey, false),
toInclusive
),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public ConcurrentNavigableMap<K, V> headMap(K toKey, boolean inclusive) {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
map.headMap(wrapKey(toKey, false), inclusive),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public ConcurrentNavigableMap<K, V> tailMap(K fromKey, boolean inclusive) {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
map.tailMap(wrapKey(fromKey, false), inclusive),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public ConcurrentNavigableMap<K, V> subMap(K fromKey, K toKey) {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
map.subMap(wrapKey(fromKey, false), wrapKey(toKey, false)),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public ConcurrentNavigableMap<K, V> headMap(K toKey) {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
map.headMap(wrapKey(toKey, false)),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public ConcurrentNavigableMap<K, V> tailMap(K fromKey) {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
map.tailMap(wrapKey(fromKey, false)),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public ConcurrentNavigableMap<K, V> descendingMap() {
processQueue();
return new ReferenceConcurrentNavigableMap<K,V>(
map.descendingMap(),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
-
public NavigableSet<K> navigableKeySet() {
processQueue();
- return new ReferenceNavigableSet<K>(map.navigableKeySet(), keyRef, getKeyQueue());
+ return new ReferenceNavigableSet<K>(map.navigableKeySet(), getKeyRQF());
}
-
public NavigableSet<K> keySet() {
processQueue();
- return new ReferenceNavigableSet<K>(map.keySet(), keyRef, getKeyQueue());
+ return new ReferenceNavigableSet<K>(map.keySet(), getKeyRQF());
}
-
public NavigableSet<K> descendingKeySet() {
processQueue();
- return new ReferenceNavigableSet<K>(map.descendingKeySet(), keyRef, getKeyQueue());
+ return new ReferenceNavigableSet<K>(map.descendingKeySet(), getKeyRQF());
}
-
public Entry<K, V> lowerEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.lowerEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public K lowerKey(K key) {
processQueue();
- return map.lowerKey(wrapKey(key, false)).get();
+ Reference<K> k = map.lowerKey(wrapKey(key, false));
+ if ( k != null ) return k.get();
+ return null;
}
-
public Entry<K, V> floorEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.floorEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public K floorKey(K key) {
processQueue();
- return map.floorKey(wrapKey(key, false)).get();
+ Reference<K> k = map.floorKey(wrapKey(key, false));
+ if ( k != null ) return k.get();
+ return null;
}
-
public Entry<K, V> ceilingEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.ceilingEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public K ceilingKey(K key) {
processQueue();
- return map.ceilingKey(wrapKey(key, false)).get();
+ Reference<K> k = map.ceilingKey(wrapKey(key, false));
+ if ( k != null ) return k.get();
+ return null;
}
-
public Entry<K, V> higherEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.higherEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public K higherKey(K key) {
processQueue();
- return map.higherKey(wrapKey(key, false)).get();
+ Reference<K> k = map.higherKey(wrapKey(key, false));
+ if ( k != null ) return k.get();
+ return null;
}
-
public Entry<K, V> firstEntry() {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.firstEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public Entry<K, V> lastEntry() {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.lastEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public Entry<K, V> pollFirstEntry() {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.pollFirstEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
public Entry<K, V> pollLastEntry() {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.pollLastEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
-
@SuppressWarnings("unchecked")
public Comparator<? super K> comparator() {
processQueue();
@@ -272,17 +230,17 @@ extends ReferenceConcurrentMap<K,V> impl
return null;
}
-
public K firstKey() {
processQueue();
- return map.firstKey().get();
+ Reference<K> k = map.firstKey();
+ if ( k != null ) return k.get();
+ return null;
}
-
public K lastKey() {
processQueue();
- return map.lastKey().get();
+ Reference<K> k = map.lastKey();
+ if ( k != null ) return k.get();
+ return null;
}
-
-
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceDeque.java Mon Oct 3 08:24:23 2011
@@ -58,42 +58,58 @@ class ReferenceDeque<T> extends Referenc
public T removeFirst() {
processQueue();
- return deque.removeFirst().get();
+ Reference<T> t = deque.removeFirst();
+ if ( t != null ) return t.get();
+ return null;
}
public T removeLast() {
processQueue();
- return deque.removeLast().get();
+ Reference<T> t = deque.removeLast();
+ if ( t != null ) return t.get();
+ return null;
}
public T pollFirst() {
processQueue();
- return deque.pollFirst().get();
+ Reference<T> t = deque.pollFirst();
+ if ( t != null ) return t.get();
+ return null;
}
public T pollLast() {
processQueue();
- return deque.pollLast().get();
+ Reference<T> t = deque.pollLast();
+ if ( t != null ) return t.get();
+ return null;
}
public T getFirst() {
processQueue();
- return deque.getFirst().get();
+ Reference<T> t = deque.getFirst();
+ if ( t != null ) return t.get();
+ return null;
}
public T getLast() {
processQueue();
- return deque.getLast().get();
+ Reference<T> t = deque.getLast();
+ if ( t != null ) return t.get();
+ return null;
}
public T peekFirst() {
processQueue();
- return deque.peekFirst().get();
+ Reference<T> t = deque.peekFirst();
+ if ( t != null ) return t.get();
+ return null;
}
public T peekLast() {
processQueue();
- return deque.peekLast().get();
+ Reference<T> t = deque.peekLast();
+ if ( t != null ) return t.get();
+ return null;
}
public boolean removeFirstOccurrence(Object o) {
@@ -118,7 +134,9 @@ class ReferenceDeque<T> extends Referenc
public T pop() {
processQueue();
- return deque.pop().get();
+ Reference<T> t = deque.pop();
+ if ( t != null ) return t.get();
+ return null;
}
public Iterator<T> descendingIterator() {
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceEntryFacade.java Mon Oct 3 08:24:23 2011
@@ -19,7 +19,6 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.Map;
import java.util.Map.Entry;
@@ -28,27 +27,30 @@ import java.util.Map.Entry;
* @author Peter Firmstone.
*/
class ReferenceEntryFacade<K, V> implements Map.Entry<K, V> {
- private final Map.Entry<Reference<K>, Reference<V>> entry;
- private final ReferenceQueue<V> queue;
- private final Ref valRef;
+ private final Entry<Reference<K>, Reference<V>> entry;
+ private final ReferenceQueuingFactory<V, Reference<V>> rqf;
- ReferenceEntryFacade(Map.Entry<Reference<K>, Reference<V>> entry,
- Ref valRef, ReferenceQueue<V> queue) {
+ ReferenceEntryFacade(Entry<Reference<K>, Reference<V>> entry, ReferenceQueuingFactory<V, Reference<V>> rqf){
this.entry = entry;
- this.queue = queue;
- this.valRef = valRef;
+ this.rqf = rqf;
}
public K getKey() {
- return entry.getKey().get();
+ Reference<K> k = entry.getKey();
+ if ( k != null ) return k.get();
+ return null;
}
public V getValue() {
- return entry.getValue().get();
+ Reference<V> v = entry.getValue();
+ if ( v != null ) return v.get();
+ return null;
}
public V setValue(V value) {
- return entry.setValue(wrapVal(value, true)).get();
+ Reference<V> v = entry.setValue(wrapVal(value, true));
+ if ( v != null ) return v.get();
+ return null;
}
/**
@@ -58,7 +60,7 @@ class ReferenceEntryFacade<K, V> impleme
*/
@Override
public int hashCode() {
- return (getKey()==null ? 0 : getKey().hashCode()) ^
+ return (getKey()==null ? 0 : getKey().hashCode()) ^
(getValue()==null ? 0 : getValue().hashCode());
}
@@ -78,7 +80,7 @@ class ReferenceEntryFacade<K, V> impleme
}
private Reference<V> wrapVal(V val, boolean enque) {
- return ReferenceFactory.create(val, enque == true ? queue : null, valRef);
+ return rqf.referenced(val, enque);
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceIterator.java Mon Oct 3 08:24:23 2011
@@ -29,6 +29,7 @@ class ReferenceIterator<T> implements It
private final Iterator<Reference<T>> iterator;
ReferenceIterator(Iterator<Reference<T>> iterator) {
+ if ( iterator == null ) throw new IllegalArgumentException("iterator cannot be null");
this.iterator = iterator;
}
@@ -37,7 +38,9 @@ class ReferenceIterator<T> implements It
}
public T next() {
- return iterator.next().get();
+ Reference<T> t = iterator.next();
+ if ( t != null ) return t.get();
+ return null;
}
public void remove() {
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceList.java Mon Oct 3 08:24:23 2011
@@ -19,7 +19,6 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -52,8 +51,8 @@ class ReferenceList<T> extends Reference
this.list = list;
}
- private ReferenceList(List<Reference<T>> list, Ref type, ReferenceQueue<T> queue){
- super(list, type, queue);
+ ReferenceList(List<Reference<T>> list, ReferenceQueuingFactory<T, Reference<T>> rqf){
+ super(list, rqf);
this.list = list;
}
@@ -99,19 +98,24 @@ class ReferenceList<T> extends Reference
return hashCode;
}
+ @SuppressWarnings("unchecked")
public boolean addAll(int index, Collection<? extends T> c) {
processQueue();
- return list.addAll(index, wrapColl(c, true));
+ return list.addAll(index, new CollectionWrapper<T>((Collection<T>) c, getRQF(), true));
}
public T get(int index) {
processQueue();
- return list.get(index).get();
+ Reference<T> r = list.get(index);
+ if (r != null) return r.get();
+ return null;
}
public T set(int index, T element) {
processQueue();
- return list.set(index, wrapObj(element, true)).get();
+ Reference<T> r = list.set(index, wrapObj(element, true));
+ if (r != null) return r.get();
+ return null;
}
public void add(int index, T element) {
@@ -121,7 +125,9 @@ class ReferenceList<T> extends Reference
public T remove(int index) {
processQueue();
- return list.remove(index).get();
+ Reference<T> r = list.remove(index);
+ if (r != null) return r.get();
+ return null;
}
@SuppressWarnings("unchecked")
@@ -138,25 +144,28 @@ class ReferenceList<T> extends Reference
public ListIterator<T> listIterator() {
processQueue();
- return new ListIteratorWrap<T>(list.listIterator(), this);
+ return new ReferenceListIterator<T>(list.listIterator(), getRQF());
}
public ListIterator<T> listIterator(int index) {
processQueue();
- return new ListIteratorWrap<T>(list.listIterator(index), this);
+ return new ReferenceListIterator<T>(list.listIterator(index), getRQF());
}
public List<T> subList(int fromIndex, int toIndex) {
processQueue();
- List<T> sub = new ReferenceList<T>(list.subList(fromIndex, toIndex), getType(), getQueue());
+ List<T> sub = new ReferenceList<T>(list.subList(fromIndex, toIndex), getRQF());
return sub;
}
- private class ListIteratorWrap<T> implements ListIterator<T>{
+ private class ReferenceListIterator<T> implements ListIterator<T>{
ListIterator<Reference<T>> iterator;
- ReferenceCollection<T> col;
- private ListIteratorWrap(ListIterator<Reference<T>> iterator, ReferenceCollection<T> c){
+ ReferenceQueuingFactory<T, Reference<T>> rqf;
+ private ReferenceListIterator(ListIterator<Reference<T>> iterator, ReferenceQueuingFactory<T, Reference<T>> rqf ){
+ if ( iterator == null || rqf == null ) throw
+ new NullPointerException("Null iterator or reference queuing factory not allowed");
this.iterator = iterator;
+ this.rqf = rqf;
}
public boolean hasNext() {
@@ -164,7 +173,9 @@ class ReferenceList<T> extends Reference
}
public T next() {
- return iterator.next().get();
+ Reference<T> t = iterator.next();
+ if ( t != null ) return t.get();
+ return null;
}
public boolean hasPrevious() {
@@ -172,7 +183,9 @@ class ReferenceList<T> extends Reference
}
public T previous() {
- return iterator.previous().get();
+ Reference<T> t = iterator.previous();
+ if ( t != null ) return t.get();
+ return null;
}
public int nextIndex() {
@@ -188,11 +201,11 @@ class ReferenceList<T> extends Reference
}
public void set(T e) {
- iterator.set(col.wrapObj(e, true));
+ iterator.set(rqf.referenced(e, true));
}
public void add(T e) {
- iterator.add(col.wrapObj( e, true));
+ iterator.add(rqf.referenced( e, true));
}
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceMap.java Mon Oct 3 08:24:23 2011
@@ -22,8 +22,6 @@ import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.AbstractMap;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -54,47 +52,44 @@ import java.util.Set;
* @param <V>
* @author Peter Firmstone.
*/
-class ReferenceMap<K, V> extends AbstractMap<K, V> implements Map<K, V>, ReferenceQueueProcessor {
+class ReferenceMap<K, V> extends AbstractMap<K, V> implements Map<K, V> {
- // ConcurrentHashMap must be protected from null values;
- private final Ref keyRef;
- private final Ref valRef;
private final Map<Reference<K>, Reference<V>> map;
- private final ReferenceQueue<K> keyQueue;
- private final ReferenceQueue<V> valQueue;
+ // ReferenceQueuingFactory's handle ReferenceQueue locking policy.
+ private final ReferenceQueuingFactory<K, Reference<K>> krqf;
+ private final ReferenceQueuingFactory<V, Reference<V>> vrqf;
+ // Views of values and keys, reduces object creation.
+ private final Collection<V> values;
+ private final Set<K> keys;
+ private final Set<Entry<K,V>> entrys;
ReferenceMap(Map<Reference<K>,Reference<V>> map, Ref key, Ref val){
- super();
- if (map == null || key == null || val == null ) throw new IllegalArgumentException("Null not allowed");
- this.map = map;
- keyQueue = new ReferenceQueue<K>();
- valQueue = new ReferenceQueue<V>();
- keyRef = key;
- valRef = val;
- }
- ReferenceMap(Map<Reference<K>,Reference<V>> map, Ref key, Ref val, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
- super();
- if (map == null || key == null || val == null ) throw new IllegalArgumentException("Null not allowed");
+ this(map,
+ new ReferenceProcessor<K>(map.keySet(), key, new ReferenceQueue<K>()),
+ new ReferenceProcessor<V>(map.values(), val, new ReferenceQueue<V>())
+ );
+ }
+
+ ReferenceMap(Map<Reference<K>, Reference<V>> map, ReferenceQueuingFactory<K, Reference<K>> krqf, ReferenceQueuingFactory<V, Reference<V>> vrqf){
this.map = map;
- this.keyQueue = keyQueue;
- this.valQueue = valQueue;
- keyRef = key;
- valRef = val;
+ this.krqf = krqf;
+ this.vrqf = vrqf;
+ values = new ReferenceCollection<V>(this.map.values(), vrqf);
+ keys = new ReferenceSet<K>(this.map.keySet(), krqf);
+ // We let this escape during construction, but it's package private only
+ // and doesn't escape the package.
+ entrys = new EntrySetFacade<Entry<K,V>, Entry<Reference<K>,Reference<V>>>(
+ map.entrySet(),
+ new EntryFacadeConverter<K,V>(krqf, vrqf)
+ );
}
- ReferenceQueue<V> getValQueue(){
- return valQueue;
- }
- ReferenceQueue<K> getKeyQueue(){
- return keyQueue;
+ ReferenceQueuingFactory<K, Reference<K>> getKeyRQF(){
+ return krqf;
}
- Ref getKeyRef(){
- return keyRef;
- }
-
- Ref getValRef(){
- return valRef;
+ ReferenceQueuingFactory<V, Reference<V>> getValRQF(){
+ return vrqf;
}
/**
@@ -108,19 +103,12 @@ class ReferenceMap<K, V> extends Abstrac
@SuppressWarnings(value = "unchecked")
public boolean containsKey(Object key) {
processQueue();
- if (key == null) {
- return false;
+ return map.containsKey(wrapKey((K)key, false));
}
- Reference<K> kRef = wrapKey((K) key, false);
- return map.containsKey(kRef);
- }
@SuppressWarnings("unchecked")
public boolean containsValue(Object value) {
processQueue();
- if (value == null) {
- return false;
- }
return map.containsValue(wrapVal((V) value, false));
}
@@ -134,15 +122,10 @@ class ReferenceMap<K, V> extends Abstrac
* mapping from the map, via the Iterator.remove, Set.remove, removeAll,
* retainAll and clear operations.
*
- * This method does not support the add or addAll operations.
- *
* @return
*/
public Set<Entry<K,V>> entrySet() {
- return new EntrySetFacade<Entry<K,V>, Entry<Reference<K>,Reference<V>>>(
- map.entrySet(),
- new EntryFacadeConverter<K,V>(this)
- );
+ return entrys;
}
/**
@@ -150,14 +133,9 @@ class ReferenceMap<K, V> extends Abstrac
*/
public V get(Object key) {
processQueue();
- if (key == null) {
- return null;
- }
@SuppressWarnings(value = "unchecked")
Reference<V> refVal = map.get(wrapKey((K) key, false));
- if (refVal != null) {
- return refVal.get();
- }
+ if (refVal != null) return refVal.get();
return null;
}
@@ -176,19 +154,18 @@ class ReferenceMap<K, V> extends Abstrac
*/
public Set<K> keySet() {
processQueue();
- return new ReferenceSet<K>(map.keySet(), keyRef, keyQueue);
+ return keys;
}
public void processQueue() {
- Reference r = null;
- while ((r = keyQueue.poll()) != null) {
- map.remove(r);
- }
- Collection<Reference<V>> values = map.values();
- while ((r = valQueue.poll()) != null) {
- values.remove(r); //Removes mapping.
+ // If someone else is cleaning out the trash, don't bother waiting,
+ // the underlying Map is responsible for it's own synchronization.
+ // Null values or keys may be returned as a result.
+ // Or a ConcurrentMap that contains a value may no longer contain
+ // it after checking.
+ krqf.processQueue();
+ vrqf.processQueue();
}
- }
/**
* Associates value with given key, returning value previously associated
@@ -199,31 +176,10 @@ class ReferenceMap<K, V> extends Abstrac
*/
public V put(K key, V value) {
processQueue();
- if (key == null) {
+ Reference<V> val = map.put(wrapKey(key, false),wrapVal(value, false));
+ if (val != null) return val.get();
return null;
}
- Reference<V> val = map.put(wrapKey(key, true), wrapVal(value, true));
- if (val != null) {
- return val.get();
- }
- return null;
- }
-
- /**
- *
- * @param m
- */
- public void putAll(Map<? extends K, ? extends V> m) {
- if (m.isEmpty()) return;
- Map<Reference<K>, Reference<V>> refMap
- = new HashMap<Reference<K>, Reference<V>>(m.size());
- Iterator<? extends Map.Entry<? extends K, ? extends V>> i = m.entrySet().iterator();
- while (i.hasNext()){
- Map.Entry<? extends K, ? extends V> ent = i.next();
- refMap.put(wrapKey(ent.getKey(), true),wrapVal(ent.getValue(), true));
- }
- map.putAll(refMap);
- }
/**
* Removes association for given key, returning value previously associated
@@ -231,14 +187,9 @@ class ReferenceMap<K, V> extends Abstrac
*/
public V remove(Object key) {
processQueue();
- if (key == null) {
- return null;
- }
@SuppressWarnings(value = "unchecked")
Reference<V> val = map.remove(wrapKey((K) key, false));
- if (val != null) {
- return val.get();
- }
+ if (val != null) return val.get();
return null;
}
@@ -252,15 +203,15 @@ class ReferenceMap<K, V> extends Abstrac
*/
public Collection<V> values() {
processQueue();
- return new ReferenceCollection<V>(map.values(), valRef, valQueue);
+ return values;
}
Reference<V> wrapVal(V val, boolean enque) {
- return ReferenceFactory.create(val, enque == true ? valQueue : null, valRef);
+ return vrqf.referenced(val, enque);
}
Reference<K> wrapKey(K key, boolean enque) {
- return ReferenceFactory.create(key, enque == true ? keyQueue : null, keyRef);
+ return krqf.referenced(key, enque);
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableMap.java Mon Oct 3 08:24:23 2011
@@ -19,7 +19,6 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
@@ -30,86 +29,84 @@ import java.util.NavigableSet;
*/
class ReferenceNavigableMap<K,V> extends ReferenceSortedMap<K,V> implements NavigableMap<K,V> {
private final NavigableMap<Reference<K>, Reference<V>> map;
- private final Ref keyRef;
- private final Ref valRef;
ReferenceNavigableMap(NavigableMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
super(map, keyRef, valRef);
this.map = map;
- this.keyRef = keyRef;
- this.valRef = valRef;
}
- ReferenceNavigableMap( NavigableMap<Reference<K>, Reference<V>> map,
- Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
- super(map, keyRef, valRef, keyQueue, valQueue);
+ ReferenceNavigableMap(NavigableMap<Reference<K>, Reference<V>> map,
+ ReferenceQueuingFactory<K, Reference<K>> krqf,
+ ReferenceQueuingFactory<V, Reference<V>> vrqf){
+ super(map, krqf, vrqf);
this.map = map;
- this.keyRef = keyRef;
- this.valRef = valRef;
}
public Entry<K, V> lowerEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.lowerEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
public K lowerKey(K key) {
processQueue();
- return map.lowerKey(wrapKey(key, false)).get();
+ Reference<K> k = map.lowerKey(wrapKey(key, false));
+ if (k != null) return k.get();
+ return null;
}
public Entry<K, V> floorEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.floorEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
public K floorKey(K key) {
processQueue();
- return map.floorKey(wrapKey(key, false)).get();
+ Reference<K> k = map.floorKey(wrapKey(key, false));
+ if (k != null) return k.get();
+ return null;
}
public Entry<K, V> ceilingEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.ceilingEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
public K ceilingKey(K key) {
processQueue();
- return map.ceilingKey(wrapKey(key, false)).get();
+ Reference<K> k = map.ceilingKey(wrapKey(key, false));
+ if (k != null) return k.get();
+ return null;
}
public Entry<K, V> higherEntry(K key) {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.higherEntry(wrapKey(key, false)),
- valRef,
- getValQueue()
+ getValRQF()
);
}
public K higherKey(K key) {
processQueue();
- return map.higherKey(wrapKey(key, false)).get();
+ Reference<K> k = map.higherKey(wrapKey(key, false));
+ if (k != null) return k.get();
+ return null;
}
public Entry<K, V> firstEntry() {
processQueue();
return new ReferenceEntryFacade<K,V>(
map.firstEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
@@ -117,8 +114,7 @@ class ReferenceNavigableMap<K,V> extends
processQueue();
return new ReferenceEntryFacade<K,V>(
map.lastEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
@@ -126,8 +122,7 @@ class ReferenceNavigableMap<K,V> extends
processQueue();
return new ReferenceEntryFacade<K,V>(
map.pollFirstEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
@@ -135,8 +130,7 @@ class ReferenceNavigableMap<K,V> extends
processQueue();
return new ReferenceEntryFacade<K,V>(
map.pollLastEntry(),
- valRef,
- getValQueue()
+ getValRQF()
);
}
@@ -144,21 +138,19 @@ class ReferenceNavigableMap<K,V> extends
processQueue();
return new ReferenceNavigableMap<K,V>(
map.descendingMap(),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
public NavigableSet<K> navigableKeySet() {
processQueue();
- return new ReferenceNavigableSet<K>(map.navigableKeySet(), keyRef, getKeyQueue());
+ return new ReferenceNavigableSet<K>(map.navigableKeySet(), getKeyRQF());
}
public NavigableSet<K> descendingKeySet() {
processQueue();
- return new ReferenceNavigableSet<K>(map.descendingKeySet(), keyRef, getKeyQueue());
+ return new ReferenceNavigableSet<K>(map.descendingKeySet(), getKeyRQF());
}
public NavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey, boolean toInclusive) {
@@ -170,10 +162,8 @@ class ReferenceNavigableMap<K,V> extends
wrapKey(toKey, false),
toInclusive
),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
@@ -182,10 +172,8 @@ class ReferenceNavigableMap<K,V> extends
processQueue();
return new ReferenceNavigableMap<K,V>(
map.headMap(wrapKey(toKey, false),inclusive),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
@@ -193,10 +181,8 @@ class ReferenceNavigableMap<K,V> extends
processQueue();
return new ReferenceNavigableMap<K,V>(
map.tailMap(wrapKey(fromKey, false),inclusive),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceNavigableSet.java Mon Oct 3 08:24:23 2011
@@ -31,53 +31,62 @@ import java.util.NavigableSet;
public class ReferenceNavigableSet<T>
extends ReferenceSortedSet<T> implements NavigableSet<T> {
private final NavigableSet<Reference<T>> set;
- private final Ref type;
public ReferenceNavigableSet(NavigableSet<Reference<T>> set, Ref type){
super(set, type);
this.set = set;
- this.type = type;
}
- ReferenceNavigableSet(NavigableSet<Reference<T>> set, Ref type, ReferenceQueue<T> queue){
- super(set, type, queue);
+ ReferenceNavigableSet(NavigableSet<Reference<T>> set, ReferenceQueuingFactory<T, Reference<T>> rqf){
+ super(set, rqf);
this.set = set;
- this.type = type;
}
public T lower(T e) {
processQueue();
- return set.lower(wrapObj(e, false)).get();
+ Reference<T> t = set.lower(wrapObj(e, false));
+ if ( t != null ) return t.get();
+ return null;
}
public T floor(T e) {
processQueue();
- return set.floor(wrapObj(e, false)).get();
+ Reference<T> t = set.floor(wrapObj(e, false));
+ if ( t != null ) return t.get();
+ return null;
}
public T ceiling(T e) {
processQueue();
- return set.ceiling(wrapObj(e, false)).get();
+ Reference<T> t = set.ceiling(wrapObj(e, false));
+ if ( t != null ) return t.get();
+ return null;
}
public T higher(T e) {
processQueue();
- return set.higher(wrapObj(e, false)).get();
+ Reference<T> t = set.higher(wrapObj(e, false));
+ if ( t != null ) return t.get();
+ return null;
}
public T pollFirst() {
processQueue();
- return set.pollFirst().get();
+ Reference<T> t = set.pollFirst();
+ if ( t != null ) return t.get();
+ return null;
}
public T pollLast() {
processQueue();
- return set.pollLast().get();
+ Reference<T> t = set.pollLast();
+ if ( t != null ) return t.get();
+ return null;
}
public NavigableSet<T> descendingSet() {
processQueue();
- return new ReferenceNavigableSet<T>(set.descendingSet(), type, getQueue());
+ return new ReferenceNavigableSet<T>(set.descendingSet(), getRQF());
}
public Iterator<T> descendingIterator() {
@@ -93,26 +102,19 @@ public class ReferenceNavigableSet<T>
fromInclusive,
wrapObj(toElement, false),
toInclusive
- ),
- type,
- getQueue()
- );
+ ), getRQF());
}
public NavigableSet<T> headSet(T toElement, boolean inclusive) {
processQueue();
return new ReferenceNavigableSet<T>(
- set.headSet(wrapObj(toElement, false), inclusive),
- type, getQueue()
- );
+ set.headSet(wrapObj(toElement, false), inclusive), getRQF());
}
public NavigableSet<T> tailSet(T fromElement, boolean inclusive) {
processQueue();
return new ReferenceNavigableSet<T>(
- set.tailSet(wrapObj(fromElement, false), inclusive),
- type, getQueue()
- );
+ set.tailSet(wrapObj(fromElement, false), inclusive), getRQF());
}
}
Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java?rev=1178329&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java Mon Oct 3 08:24:23 2011
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collection;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ *
+ *
+ * @param <T>
+ * @author peter
+ */
+class ReferenceProcessor<T> implements ReferenceQueuingFactory<T, Reference<T>> {
+
+ private final Collection<Reference<T>> col;
+ private final ReferenceQueue<T> queue;
+ private final Ref type;
+ private final Lock queueLock;
+
+ ReferenceProcessor(Collection<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
+ if (col == null || type == null ) throw new NullPointerException("collection or reference type cannot be null");
+ this.col = col;
+ this.type = type;
+ this.queue = type == Ref.STRONG ? null : queue;
+ queueLock = new ReentrantLock();
+ }
+
+ @Override
+ public T pseudoReferent(Reference<T> u) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public Reference<T> referenced(T w, boolean enque) {
+ if (w == null) return null;
+ return ReferenceFactory.create(w, enque == true ? queue : null, type);
+ }
+
+ @Override
+ public void processQueue() {
+ if (queue == null) return;
+ Object t = null;
+ /*
+ * The reason for using an explicit lock is if another thread is
+ * removing the garbage, we don't want to prevent all other threads
+ * accessing the underlying collection, when it blocks on poll,
+ * this means that some client threads will receive null values
+ * on occassion, but this is a small price to pay.
+ * Might have to employ the null object pattern.
+ */
+ if ( queueLock.tryLock()){
+ try {
+ while ( (t = queue.poll()) != null){
+ col.remove(t);
+ }
+ }finally{
+ queueLock.unlock();
+ }
+ }
+ }
+}
Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueueProcessor.java Mon Oct 3 08:24:23 2011
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.river.impl.util;
-
-/**
- *
- * @author peter
- */
-interface ReferenceQueueProcessor {
-
- void processQueue();
-
-}
Added: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java?rev=1178329&view=auto
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java (added)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java Mon Oct 3 08:24:23 2011
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.util;
+
+/**
+ * An interface for processing ReferenceQueue's and encapsulating Objects
+ * in references and for making references appear as their referent.
+ *
+ * @author Peter Firmstone
+ */
+interface ReferenceQueuingFactory<O, R> {
+
+ O pseudoReferent(R u);
+
+ R referenced(O w, boolean enque);
+
+ void processQueue();
+
+}
Propchange: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceQueuingFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSet.java Mon Oct 3 08:24:23 2011
@@ -19,8 +19,6 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
-import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
@@ -37,8 +35,8 @@ class ReferenceSet<T> extends ReferenceC
super(col, type);
}
- ReferenceSet(Set<Reference<T>> col, Ref type, ReferenceQueue<T> queue){
- super(col, type, queue);
+ ReferenceSet(Set<Reference<T>> col, ReferenceQueuingFactory<T, Reference<T>> rqf){
+ super(col, rqf);
}
public boolean equals(Object o) {
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedMap.java Mon Oct 3 08:24:23 2011
@@ -19,7 +19,6 @@
package org.apache.river.impl.util;
import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
import java.util.Comparator;
import java.util.SortedMap;
@@ -31,22 +30,17 @@ import java.util.SortedMap;
*/
class ReferenceSortedMap<K,V> extends ReferenceMap<K,V> implements SortedMap<K,V>{
private SortedMap<Reference<K>, Reference<V>> map;
- private Ref keyRef;
- private Ref valRef;
ReferenceSortedMap(SortedMap<Reference<K>, Reference<V>> map, Ref keyRef, Ref valRef){
super(map, keyRef, valRef);
this.map = map;
- this.keyRef = keyRef;
- this.valRef = valRef;
}
- ReferenceSortedMap( SortedMap<Reference<K>, Reference<V>> map,
- Ref keyRef, Ref valRef, ReferenceQueue<K> keyQueue, ReferenceQueue<V> valQueue){
- super(map, keyRef, valRef, keyQueue, valQueue);
+ ReferenceSortedMap(SortedMap<Reference<K>, Reference<V>> map,
+ ReferenceQueuingFactory<K, Reference<K>> krqf,
+ ReferenceQueuingFactory<V, Reference<V>> vrqf){
+ super(map, krqf, vrqf);
this.map = map;
- this.keyRef = keyRef;
- this.valRef = valRef;
}
@SuppressWarnings("unchecked")
@@ -63,10 +57,8 @@ class ReferenceSortedMap<K,V> extends Re
processQueue();
return new ReferenceSortedMap<K,V>(
map.subMap(wrapKey(fromKey, false), wrapKey(toKey, false)),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
@@ -75,10 +67,8 @@ class ReferenceSortedMap<K,V> extends Re
processQueue();
return new ReferenceSortedMap<K,V>(
map.headMap(wrapKey(toKey, false)),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
@@ -87,23 +77,25 @@ class ReferenceSortedMap<K,V> extends Re
processQueue();
return new ReferenceSortedMap<K,V>(
map.tailMap(wrapKey(fromKey, false)),
- keyRef,
- valRef,
- getKeyQueue(),
- getValQueue()
+ getKeyRQF(),
+ getValRQF()
);
}
public K firstKey() {
processQueue();
- return map.firstKey().get();
+ Reference<K> k = map.firstKey();
+ if (k != null) return k.get();
+ return null;
}
public K lastKey() {
processQueue();
- return map.lastKey().get();
+ Reference<K> k = map.lastKey();
+ if (k != null) return k.get();
+ return null;
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferenceSortedSet.java Mon Oct 3 08:24:23 2011
@@ -35,17 +35,15 @@ import java.util.SortedSet;
*/
class ReferenceSortedSet<T> extends ReferenceSet<T> implements SortedSet<T> {
private final SortedSet<Reference<T>> set;
- private final Ref type;
+
ReferenceSortedSet( SortedSet<Reference<T>> set, Ref type){
super(set, type);
this.set = set;
- this.type = type;
}
- ReferenceSortedSet(SortedSet<Reference<T>> set, Ref type, ReferenceQueue<T> queue){
- super(set, type, queue);
+ ReferenceSortedSet(SortedSet<Reference<T>> set, ReferenceQueuingFactory<T, Reference<T>> rqf){
+ super(set, rqf);
this.set = set;
- this.type = type;
}
@SuppressWarnings("unchecked")
@@ -62,29 +60,33 @@ class ReferenceSortedSet<T> extends Refe
processQueue();
Reference<T> from = wrapObj(fromElement, false);
Reference<T> to = wrapObj(toElement, false);
- return new ReferenceSortedSet<T>( set.subSet(from, to), type, getQueue());
+ return new ReferenceSortedSet<T>( set.subSet(from, to), getRQF());
}
public SortedSet<T> headSet(T toElement) {
processQueue();
Reference<T> to = wrapObj(toElement, false);
- return new ReferenceSortedSet<T>(set.headSet(to), type, getQueue());
+ return new ReferenceSortedSet<T>(set.headSet(to), getRQF());
}
public SortedSet<T> tailSet(T fromElement) {
processQueue();
Reference<T> from = wrapObj(fromElement, false);
- return new ReferenceSortedSet<T>(set.tailSet(from), type, getQueue());
+ return new ReferenceSortedSet<T>(set.tailSet(from), getRQF());
}
public T first() {
processQueue();
- return set.first().get();
+ Reference<T> t = set.first();
+ if ( t != null ) return t.get();
+ return null;
}
public T last() {
processQueue();
- return set.last().get();
+ Reference<T> t = set.last();
+ if ( t != null ) return t.get();
+ return null;
}
}
Modified: river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java?rev=1178329&r1=1178328&r2=1178329&view=diff
==============================================================================
--- river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java (original)
+++ river/jtsk/skunk/peterConcurrentPolicy/src/org/apache/river/impl/util/ReferencedQueue.java Mon Oct 3 08:24:23 2011
@@ -28,12 +28,10 @@ import java.util.Queue;
*/
public class ReferencedQueue<T> extends ReferenceCollection<T> implements Queue<T> {
private final Queue<Reference<T>> queue;
- private final Ref type;
public ReferencedQueue( Queue<Reference<T>> queue, Ref type){
super(queue, type);
this.queue = queue;
- this.type = type;
}
public boolean offer(T e) {
processQueue();
@@ -43,22 +41,30 @@ public class ReferencedQueue<T> extends
public T remove() {
processQueue();
- return queue.remove().get();
+ Reference<T> t = queue.remove();
+ if ( t != null ) return t.get();
+ return null;
}
public T poll() {
processQueue();
- return queue.poll().get();
+ Reference<T> t = queue.poll();
+ if ( t != null ) return t.get();
+ return null;
}
public T element() {
processQueue();
- return queue.element().get();
+ Reference<T> t = queue.element();
+ if ( t != null ) return t.get();
+ return null;
}
public T peek() {
processQueue();
- return queue.peek().get();
+ Reference<T> t = queue.peek();
+ if ( t != null ) return t.get();
+ return null;
}
}