You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2020/07/05 11:41:42 UTC

svn commit: r1879521 [30/37] - in /river/jtsk/modules/modularize/apache-river: ./ browser/ browser/src/main/java/org/apache/river/example/browser/ extra/ groovy-config/ river-activation/ river-collections/ river-collections/src/main/java/org/apache/riv...

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java Sun Jul  5 11:41:39 2020
@@ -1,307 +1,310 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.WeakHashMap;
-import net.jini.core.transaction.TransactionException;
-import net.jini.id.Uuid;
-import net.jini.space.InternalSpaceException;
-
-/**
- * Subclass of <code>QueryWatcher</code> for <code>takeIfExists</code>
- * queries.  Resolves with the first matching transition where the
- * entry is visible to the associated transaction and the entry is
- * still available, or of the locked entry set goes empty.
- */
-class TakeIfExistsWatcher extends SingletonQueryWatcher 
-    implements IfExistsWatcher, Transactable
-{
-    /**
-     * The set of entries that would match but are currently
-     * unavailable (e.g. they are locked). We only keep
-     * the ids, not the entries themselves.
-     */
-    private final Set<Uuid> lockedEntries;
-
-    /**
-     * Set <code>true</code> once the query thread is 
-     * done processing the backlog. Once this is 
-     * <code>true</code> it is ok to resolve if
-     * <code>lockedEntries</code> is empty.
-     */
-    private boolean backlogFinished = false;
-
-    /**
-     * If non-null the transaction this query is
-     * being performed under. If <code>null</code> 
-     * this query is not associated with a transaction.
-     */
-    private final Txn txn;
-
-    /**
-     * Set of entries (represented by <code>EntryHolder</code>s) that
-     * we would have liked to return, but have been provisionally
-     * removed.
-     */
-    private final Set<EntryHandle> provisionallyRemovedEntrySet;
-
-    /**
-     * Create a new <code>TakeIfExistsWatcher</code>.
-     * @param expiration the initial expiration time
-     *        for this <code>TransitionWatcher</code> in 
-     *        milliseconds since the beginning of the epoch.
-     * @param timestamp the value that is used
-     *        to sort <code>TransitionWatcher</code>s.
-     * @param startOrdinal the highest ordinal associated
-     *        with operations that are considered to have occurred 
-     *        before the operation associated with this watcher.
-     * @param lockedEntries Set of entries (by their IDs)
-     *        that match but are unavailable. Must be non-empty.
-     *        Keeps a reference to this object.
-     * @param provisionallyRemovedEntrySet If the watcher encounters
-     *        an entry that can not be read/taken because it has been
-     *        provisionally removed then its handle will be placed in
-     *        this <code>WeakHashMap</code> as a key (with null as the
-     *        value).  May be <code>null</code> in which case
-     *        provisionally removed entries will not be
-     *        recorded. Ensures that object is only accessed by one
-     *        thread at a time
-     * @param txn If the query is being performed under
-     *        a transaction the <code>Txn</code> object
-     *        associated with that transaction.
-     * @throws NullPointerException if <code>lockedEntries</code> is
-     *         <code>null</code>.
-     */
-    TakeIfExistsWatcher(long expiration, long timestamp, 
-	 long startOrdinal, Set<Uuid> lockedEntries, 
-         Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
-    {
-	super(expiration, timestamp, startOrdinal);
-
-	if (lockedEntries == null) 
-	    throw new NullPointerException("lockedEntries must be non-null");
-	
-	this.lockedEntries = lockedEntries;
-	this.txn = txn;
-	this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; 
-    }
-
-    boolean isInterested(EntryTransition transition, long ordinal) {
-	/* If we are unresolved pretty much all transitions are
-	 * interesting because we may need to update
-	 * lockedEntries.
-	 * 
-	 * Note, !isResolved() without the lock will result only in
-	 * false positives, not false negatives - it will only
-	 * cause isInterested() to return false if we are resolved,
-	 * we may still return true if we are resolved though.  
-	 */
-	return (ordinal>startOrdinal) && !isResolved();
-    }
-
-    synchronized void process(EntryTransition transition, long now) {
-	if (isResolved())
-	    return; // Already done.
-
-	final EntryHandle handle = transition.getHandle();
-	final EntryRep rep = handle.rep();
-	final boolean isAvailable = transition.isAvailable();
-	final TransactableMgr transitionTxn = transition.getTxn();
-
-	/* If it at one time it was available to our transaction
-	 * it may still be, try to get it.
-	 */
-	if (isAvailable &&
-	    ((null == transitionTxn) || txn == transitionTxn)) {
-	    /* Is it still available? */
-	    if (getServer().attemptCapture(handle, txn, true, null, 
-		provisionallyRemovedEntrySet, now, this)) 
-	    {
-		// Got it
-		resolve(handle, null);
-	    } else {
-		/* Must not have been able to get it. Either
-		 * locked under a conflicting lock, in which
-		 * case it should be in our lockedEntries set, or
-		 * it has been removed, in which it still needs
-		 * to be in our lockedEntries since it may have
-		 * been replaced before being removed.
-		 */
-		lockedEntries.add(rep.id());		 
-	    }
-	} else if (isAvailable) { // but it is not visible to txn
-	    /* If we are here then at one time it must have been was
-	     * available but not visible to us, implying that it was an
-	     * entry written under a transaction and is interesting to
-	     * us, but not yet visible. We need to add it lockedEntries
-	     * even if has been removed since it could have gotten
-	     * replaced before it was removed. If we did not
-	     * add it we would be acting on the future.
-	     */
-	    lockedEntries.add(rep.id());
-	} else {
-	    /* Must not be available, transition must mark
-	     * the resolution of the transaction in such away
-	     * that the entry has been removed, remove it 
-	     * from the set and see if that makes the set empty.
-	     */
-	    lockedEntries.remove(rep.id());
-	    if (backlogFinished && lockedEntries.isEmpty())
-		resolve(null, null);
-	}
-    }
-
-    synchronized boolean catchUp(EntryTransition transition, long now) {
-	if (isResolved())
-	    return true; // Already done.
-
-	final EntryHandle handle = transition.getHandle();
-	final EntryRep rep = handle.rep();
-	final boolean isAvailable = transition.isAvailable();
-	final TransactableMgr transitionTxn = transition.getTxn();
-
-	/* If it at one time it was available to our transaction
-	 * it may still be, try to get it.
-	 */
-	if (isAvailable &&
-	    ((null == transitionTxn) || txn == transitionTxn)) {
-	    /* Is it still available? Try to get it. attemptCapture will
-	     * add the entry to lockedEntries for us if we could not
-	     * get it and it is still in the space (but locked).
-	     * Nothing will be added if it has been removed outright. 
-	     * This is ok even though we are peaking into the future -
-	     * we won't act on that information until the future
-	     * comes to pass.
-	     */
-	    if (getServer().attemptCapture(handle, txn, true,
-		lockedEntries, provisionallyRemovedEntrySet, now, this)) 
-	    {
-		// Got it
-		resolve(handle, null);
-		return true;
-	    }
-
-	    // did not resolve
-	    return false;
-	}
-
-	if (isAvailable) { // but it is not visible to txn
-	    /* If we are here then at one time it must have been was
-	     * available but not visible to us, implying that it was
-	     * an entry written under a transaction and is interesting
-	     * to us, but not yet visible. We only add if it has not
-	     * already been removed.  It might have gotten replaced
-	     * before removal, but since we won't let this query get
-	     * resolved with a definitive null before we get past the
-	     * point in the journal where it was removed it is ok to
-	     * never put it in (and if we did put it in it might never
-	     * get removed since process() may have already processed
-	     * the removal). We don't need to check to see it the
-	     * entry has been provisionally removed since if has been
-	     * provisional removal does not put in entries in the
-	     * journal and if it is provisionally removed it has not
-	     * yet been removed so the remove recored has not yet been
-	     * created (must less processed).
-	     */
-	    synchronized (handle) {
-		if (!handle.removed()) {
-		    lockedEntries.add(rep.id());
-		}
-
-		/* If it has been removed, there is no way it
-		 * will be interesting to us again, ever.
-		 */
-	    }
-	    // Either way, still not resolved.
-	    return false;
-	}
-
-	/* Must not be available, transition must mark
-	 * the resolution of the transaction in such away
-	 * that the entry has been removed, remove it 
-	 * from the set (don't need to check for empty
-	 * because we haven't gotten to the point
-	 * where we can resolve with a definitive null.)
-	 */
-	lockedEntries.remove(rep.id());
-	return false;
-    }
-
-    /**
-     * Once the backlog is complete we can resolve if 
-     * lockedEntries is/becomes empty.
-     */
-    public synchronized void caughtUp() {
-	backlogFinished = true;
-	
-	if (isResolved())
-	    return; // Don't much mater.
-
-	if (lockedEntries.isEmpty())
-	    resolve(null, null);	
-    }
-
-    public synchronized boolean isLockedEntrySetEmpty() {
-	if (!isResolved())
-	    throw new IllegalStateException("Query not yet resolved");	
-	return lockedEntries.isEmpty();
-    }
-
-    /**
-     * If a transaction ends in the middle of a query we want
-     * to throw an exception to the client making the query
-     * not the <code>Txn</code> calling us here.)
-     */
-    public synchronized int prepare(TransactableMgr mgr,
-				    OutriggerServerImpl space) 
-    {
-	assert txn != null:"Transactable method called on a " +
-	    "non-transactional TakeIfExistsWatcher";
-
-	// only throw an exception if we are not resolved.
-	if (!isResolved()) {
-	    // Query still in progress, kill it
-	    resolve(null, new TransactionException("completed while " +
-						   "operation in progress"));
-	}
-
-	// If this object has made changes they have been recorded elsewhere
-	return NOTCHANGED;
-    }
-
-    /**
-     * This should never happen since we always return
-     * <code>NOTCHANGED</code> from <code>prepare</code>.
-     */
-    public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
-	throw new InternalSpaceException("committing a blocking query");
-			   
-    }
-
-    /**
-     * If a transaction ends in the middle of a query we want
-     * to throw an exception to the client making the query 
-     * (not the <code>Txn</code> calling us here.)
-     */
-    public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
-	// prepare does the right thing, and should forever
-	prepare(mgr, space);
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import net.jini.core.transaction.TransactionException;
+import net.jini.id.Uuid;
+import net.jini.space.InternalSpaceException;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+
+/**
+ * Subclass of <code>QueryWatcher</code> for <code>takeIfExists</code>
+ * queries.  Resolves with the first matching transition where the
+ * entry is visible to the associated transaction and the entry is
+ * still available, or of the locked entry set goes empty.
+ */
+class TakeIfExistsWatcher extends SingletonQueryWatcher 
+    implements IfExistsWatcher, Transactable
+{
+    /**
+     * The set of entries that would match but are currently
+     * unavailable (e.g. they are locked). We only keep
+     * the ids, not the entries themselves.
+     */
+    private final Set<Uuid> lockedEntries;
+
+    /**
+     * Set <code>true</code> once the query thread is 
+     * done processing the backlog. Once this is 
+     * <code>true</code> it is ok to resolve if
+     * <code>lockedEntries</code> is empty.
+     */
+    private boolean backlogFinished = false;
+
+    /**
+     * If non-null the transaction this query is
+     * being performed under. If <code>null</code> 
+     * this query is not associated with a transaction.
+     */
+    private final Txn txn;
+
+    /**
+     * Set of entries (represented by <code>EntryHolder</code>s) that
+     * we would have liked to return, but have been provisionally
+     * removed.
+     */
+    private final Set<EntryHandle> provisionallyRemovedEntrySet;
+
+    /**
+     * Create a new <code>TakeIfExistsWatcher</code>.
+     * @param expiration the initial expiration time
+     *        for this <code>TransitionWatcher</code> in 
+     *        milliseconds since the beginning of the epoch.
+     * @param timestamp the value that is used
+     *        to sort <code>TransitionWatcher</code>s.
+     * @param startOrdinal the highest ordinal associated
+     *        with operations that are considered to have occurred 
+     *        before the operation associated with this watcher.
+     * @param lockedEntries Set of entries (by their IDs)
+     *        that match but are unavailable. Must be non-empty.
+     *        Keeps a reference to this object.
+     * @param provisionallyRemovedEntrySet If the watcher encounters
+     *        an entry that can not be read/taken because it has been
+     *        provisionally removed then its handle will be placed in
+     *        this <code>WeakHashMap</code> as a key (with null as the
+     *        value).  May be <code>null</code> in which case
+     *        provisionally removed entries will not be
+     *        recorded. Ensures that object is only accessed by one
+     *        thread at a time
+     * @param txn If the query is being performed under
+     *        a transaction the <code>Txn</code> object
+     *        associated with that transaction.
+     * @throws NullPointerException if <code>lockedEntries</code> is
+     *         <code>null</code>.
+     */
+    TakeIfExistsWatcher(long expiration, long timestamp, 
+	 long startOrdinal, Set<Uuid> lockedEntries, 
+         Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
+    {
+	super(expiration, timestamp, startOrdinal);
+
+	if (lockedEntries == null) 
+	    throw new NullPointerException("lockedEntries must be non-null");
+	
+	this.lockedEntries = lockedEntries;
+	this.txn = txn;
+	this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; 
+    }
+
+    boolean isInterested(EntryTransition transition, long ordinal) {
+	/* If we are unresolved pretty much all transitions are
+	 * interesting because we may need to update
+	 * lockedEntries.
+	 * 
+	 * Note, !isResolved() without the lock will result only in
+	 * false positives, not false negatives - it will only
+	 * cause isInterested() to return false if we are resolved,
+	 * we may still return true if we are resolved though.  
+	 */
+	return (ordinal>startOrdinal) && !isResolved();
+    }
+
+    synchronized void process(EntryTransition transition, long now) {
+	if (isResolved())
+	    return; // Already done.
+
+	final EntryHandle handle = transition.getHandle();
+	final EntryRep rep = handle.rep();
+	final boolean isAvailable = transition.isAvailable();
+	final TransactableMgr transitionTxn = transition.getTxn();
+
+	/* If it at one time it was available to our transaction
+	 * it may still be, try to get it.
+	 */
+	if (isAvailable &&
+	    ((null == transitionTxn) || txn == transitionTxn)) {
+	    /* Is it still available? */
+	    if (getServer().attemptCapture(handle, txn, true, null, 
+		provisionallyRemovedEntrySet, now, this)) 
+	    {
+		// Got it
+		resolve(handle, null);
+	    } else {
+		/* Must not have been able to get it. Either
+		 * locked under a conflicting lock, in which
+		 * case it should be in our lockedEntries set, or
+		 * it has been removed, in which it still needs
+		 * to be in our lockedEntries since it may have
+		 * been replaced before being removed.
+		 */
+		lockedEntries.add(rep.id());		 
+	    }
+	} else if (isAvailable) { // but it is not visible to txn
+	    /* If we are here then at one time it must have been was
+	     * available but not visible to us, implying that it was an
+	     * entry written under a transaction and is interesting to
+	     * us, but not yet visible. We need to add it lockedEntries
+	     * even if has been removed since it could have gotten
+	     * replaced before it was removed. If we did not
+	     * add it we would be acting on the future.
+	     */
+	    lockedEntries.add(rep.id());
+	} else {
+	    /* Must not be available, transition must mark
+	     * the resolution of the transaction in such away
+	     * that the entry has been removed, remove it 
+	     * from the set and see if that makes the set empty.
+	     */
+	    lockedEntries.remove(rep.id());
+	    if (backlogFinished && lockedEntries.isEmpty())
+		resolve(null, null);
+	}
+    }
+
+    synchronized boolean catchUp(EntryTransition transition, long now) {
+	if (isResolved())
+	    return true; // Already done.
+
+	final EntryHandle handle = transition.getHandle();
+	final EntryRep rep = handle.rep();
+	final boolean isAvailable = transition.isAvailable();
+	final TransactableMgr transitionTxn = transition.getTxn();
+
+	/* If it at one time it was available to our transaction
+	 * it may still be, try to get it.
+	 */
+	if (isAvailable &&
+	    ((null == transitionTxn) || txn == transitionTxn)) {
+	    /* Is it still available? Try to get it. attemptCapture will
+	     * add the entry to lockedEntries for us if we could not
+	     * get it and it is still in the space (but locked).
+	     * Nothing will be added if it has been removed outright. 
+	     * This is ok even though we are peaking into the future -
+	     * we won't act on that information until the future
+	     * comes to pass.
+	     */
+	    if (getServer().attemptCapture(handle, txn, true,
+		lockedEntries, provisionallyRemovedEntrySet, now, this)) 
+	    {
+		// Got it
+		resolve(handle, null);
+		return true;
+	    }
+
+	    // did not resolve
+	    return false;
+	}
+
+	if (isAvailable) { // but it is not visible to txn
+	    /* If we are here then at one time it must have been was
+	     * available but not visible to us, implying that it was
+	     * an entry written under a transaction and is interesting
+	     * to us, but not yet visible. We only add if it has not
+	     * already been removed.  It might have gotten replaced
+	     * before removal, but since we won't let this query get
+	     * resolved with a definitive null before we get past the
+	     * point in the journal where it was removed it is ok to
+	     * never put it in (and if we did put it in it might never
+	     * get removed since process() may have already processed
+	     * the removal). We don't need to check to see it the
+	     * entry has been provisionally removed since if has been
+	     * provisional removal does not put in entries in the
+	     * journal and if it is provisionally removed it has not
+	     * yet been removed so the remove recored has not yet been
+	     * created (must less processed).
+	     */
+	    synchronized (handle) {
+		if (!handle.removed()) {
+		    lockedEntries.add(rep.id());
+		}
+
+		/* If it has been removed, there is no way it
+		 * will be interesting to us again, ever.
+		 */
+	    }
+	    // Either way, still not resolved.
+	    return false;
+	}
+
+	/* Must not be available, transition must mark
+	 * the resolution of the transaction in such away
+	 * that the entry has been removed, remove it 
+	 * from the set (don't need to check for empty
+	 * because we haven't gotten to the point
+	 * where we can resolve with a definitive null.)
+	 */
+	lockedEntries.remove(rep.id());
+	return false;
+    }
+
+    /**
+     * Once the backlog is complete we can resolve if 
+     * lockedEntries is/becomes empty.
+     */
+    public synchronized void caughtUp() {
+	backlogFinished = true;
+	
+	if (isResolved())
+	    return; // Don't much mater.
+
+	if (lockedEntries.isEmpty())
+	    resolve(null, null);	
+    }
+
+    public synchronized boolean isLockedEntrySetEmpty() {
+	if (!isResolved())
+	    throw new IllegalStateException("Query not yet resolved");	
+	return lockedEntries.isEmpty();
+    }
+
+    /**
+     * If a transaction ends in the middle of a query we want
+     * to throw an exception to the client making the query
+     * not the <code>Txn</code> calling us here.)
+     */
+    public synchronized int prepare(TransactableMgr mgr,
+				    OutriggerServerImpl space) 
+    {
+	assert txn != null:"Transactable method called on a " +
+	    "non-transactional TakeIfExistsWatcher";
+
+	// only throw an exception if we are not resolved.
+	if (!isResolved()) {
+	    // Query still in progress, kill it
+	    resolve(null, new TransactionException("completed while " +
+						   "operation in progress"));
+	}
+
+	// If this object has made changes they have been recorded elsewhere
+	return NOTCHANGED;
+    }
+
+    /**
+     * This should never happen since we always return
+     * <code>NOTCHANGED</code> from <code>prepare</code>.
+     */
+    public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
+	throw new InternalSpaceException("committing a blocking query");
+			   
+    }
+
+    /**
+     * If a transaction ends in the middle of a query we want
+     * to throw an exception to the client making the query 
+     * (not the <code>Txn</code> calling us here.)
+     */
+    public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
+	// prepare does the right thing, and should forever
+	prepare(mgr, space);
+    }
+}

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java Sun Jul  5 11:41:39 2020
@@ -1,232 +1,234 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-/**
- * <code>TemplateHandle</code> associates one or more
- * <code>TransitionWatcher</code>s with a template.
- * Unless otherwise noted all methods are thread safe.
- */
-class TemplateHandle extends BaseHandle {
-    
-    /**
-     * The watchers. We use a <code>HashSet</code> because we will
-     * probably do a fair number of removals for each traversal and
-     * the number of watchers managed by one <code>TemplateHandle</code>
-     * will probably never get very large. If this does become an
-     * issue making <code>TransitionWatcher</code> extend
-     * <code>FastList.Node</code> and using a <code>FastList</code>
-     * here would probably be a good choice (though that would require
-     * changing <code>FastList</code> to support overlapping traversals
-     * of different lists from the same thread.)
-     */
-    final private Set<TransitionWatcher> watchers 
-            = Collections.newSetFromMap(
-                        new ConcurrentHashMap<TransitionWatcher,Boolean>());
-    /**
-     * WriteLock guarantees that no updates can be performed during a 
-     * removal operation.
-     */
-    final private WriteLock wl;
-    final private ReadLock rl;
-    private boolean removed; // mutate with wl, read with rl
-
-    /**
-     * The <code>WatchersForTemplateClass</code> this object
-     * belongs to.
-     */
-    final private OutriggerServerImpl owner;
-
-    /**
-     * Create a handle for the template <code>tmpl</code>.
-     */
-    TemplateHandle(EntryRep tmpl, OutriggerServerImpl owner, Queue<TemplateHandle> content) {
-	super(tmpl, content);
-	this.owner = owner;
-        ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
-        wl = rwl.writeLock();
-        rl = rwl.readLock();
-    }
-
-    /**
-     * Return the description for the given field count.
-     */
-    EntryHandleTmplDesc descFor(int index) {
-        return EntryHandle.descFor(rep(), index);
-    }
-
-    /**
-     * Return <code>true</code> if this template matches the given entry.
-     */
-    boolean matches(EntryRep entry) {
-	return rep().matches(entry);
-    }
-
-    /** 
-     * Add a watcher to this handle. Assumes that the handle has not
-     * been removed.
-     * @param watcher the watcher to be added.
-     * @return true if watcher is added, false otherwise.
-     * @throws NullPointerException if watcher is <code>null</code>.
-     */
-    boolean addTransitionWatcher(TransitionWatcher watcher) {
-        if (watcher == null)
-	    throw new NullPointerException("Watcher can not be null");
-        rl.lock();
-        try {
-            if (removed) return false;
-            if (watcher.addTemplateHandle(this)) {
-                return watchers.add(watcher);
-            }
-            return false;
-        } finally {
-            rl.unlock();
-        }
-    }
-
-    /**
-     * Remote a watcher from this handle. Does nothing 
-     * if the specified watcher is not associated with
-     * this <code>TemplateHandle</code>.
-     * @param watcher the watcher to be removed.
-     * @throws NullPointerException if watcher is <code>null</code>.
-     */
-    void removeTransitionWatcher(TransitionWatcher watcher) {
-	if (watcher == null)
-	    throw new NullPointerException("Watcher can not be null");
-        rl.lock();
-        try {
-            watchers.remove(watcher);
-        } finally {
-            rl.unlock();
-        }
-    }
-
-    /**
-     * Iterate over the watchers associated with 
-     * this handle calling <code>isInterested</code> on each
-     * and if it returns <code>true</code> adding the watcher to the
-     * passed set.
-     *
-     * @param set The set to accumulate interested watchers
-     *            into.
-     * @param transition The transition being processed.
-     * @param ordinal The ordinal associated with <code>transition</code>.
-     * @throws NullPointerException if either argument is <code>null</code>.
-     */
-    void collectInterested(Set<TransitionWatcher> set, EntryTransition transition,
-					long ordinal) 
-    {
-        rl.lock();
-        try {
-            if (removed) return;
-            final Iterator i = watchers.iterator();
-            while (i.hasNext()) {
-                final TransitionWatcher w = (TransitionWatcher)i.next();
-                if (w.isInterested(transition, ordinal)) {
-                    set.add(w);
-                }
-            }
-        } finally {
-            rl.unlock();
-        }
-    }
-
-    /**
-     * Return the <code>OutriggerServerImpl</code> this 
-     * handle is part of.
-     * @return The <code>OutriggerServerImpl</code> this 
-     * handle is part of.
-     */
-    OutriggerServerImpl getServer() {
-	return owner;
-    }
-
-    /**
-     * Visit each <code>TransitionWatcher</code> and check to see if
-     * it has expired, removing it if it has.
-     * @param now an estimate of the current time expressed as
-     *            milliseconds since the beginning of the epoch.
-     */
-    void reap(long now) {
-        rl.lock();
-        try{
-            Iterator<TransitionWatcher> it = watchers.iterator();
-            while (it.hasNext()){
-                it.next().removeIfExpired(now);
-            }
-        } finally {
-            rl.unlock();
-        }
-    }
-
-    
-    /**
-     * Need to lock on the wl so no one will
-     * add a watcher between the check for empty and
-     * when it gets removed.
-     */
-    boolean removeIfEmpty(){
-        wl.lock();
-        try {
-            if (watchers.isEmpty()) {
-                return remove();
-            }
-            return false;
-        } finally {
-            wl.unlock();
-        }
-    }
-
-    @Override
-    public boolean removed() {
-        rl.lock();
-        try {
-            return removed;
-        } finally {
-            rl.unlock();
-        }
-    }
-
-    @Override
-    public boolean remove() {
-        wl.lock();
-        try {
-            if (removed){
-                return false; // already removed.
-            } else {
-                removed = super.remove();
-                return removed;
-            }
-        } finally {
-            wl.unlock();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+/**
+ * <code>TemplateHandle</code> associates one or more
+ * <code>TransitionWatcher</code>s with a template.
+ * Unless otherwise noted all methods are thread safe.
+ */
+class TemplateHandle extends BaseHandle {
+    
+    /**
+     * The watchers. We use a <code>HashSet</code> because we will
+     * probably do a fair number of removals for each traversal and
+     * the number of watchers managed by one <code>TemplateHandle</code>
+     * will probably never get very large. If this does become an
+     * issue making <code>TransitionWatcher</code> extend
+     * <code>FastList.Node</code> and using a <code>FastList</code>
+     * here would probably be a good choice (though that would require
+     * changing <code>FastList</code> to support overlapping traversals
+     * of different lists from the same thread.)
+     */
+    final private Set<TransitionWatcher> watchers 
+            = Collections.newSetFromMap(
+                        new ConcurrentHashMap<TransitionWatcher,Boolean>());
+    /**
+     * WriteLock guarantees that no updates can be performed during a 
+     * removal operation.
+     */
+    final private WriteLock wl;
+    final private ReadLock rl;
+    private boolean removed; // mutate with wl, read with rl
+
+    /**
+     * The <code>WatchersForTemplateClass</code> this object
+     * belongs to.
+     */
+    final private OutriggerServerImpl owner;
+
+    /**
+     * Create a handle for the template <code>tmpl</code>.
+     */
+    TemplateHandle(EntryRep tmpl, OutriggerServerImpl owner, Queue<TemplateHandle> content) {
+	super(tmpl, content);
+	this.owner = owner;
+        ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
+        wl = rwl.writeLock();
+        rl = rwl.readLock();
+    }
+
+    /**
+     * Return the description for the given field count.
+     */
+    EntryHandleTmplDesc descFor(int index) {
+        return EntryHandle.descFor(rep(), index);
+    }
+
+    /**
+     * Return <code>true</code> if this template matches the given entry.
+     */
+    boolean matches(EntryRep entry) {
+	return rep().matches(entry);
+    }
+
+    /** 
+     * Add a watcher to this handle. Assumes that the handle has not
+     * been removed.
+     * @param watcher the watcher to be added.
+     * @return true if watcher is added, false otherwise.
+     * @throws NullPointerException if watcher is <code>null</code>.
+     */
+    boolean addTransitionWatcher(TransitionWatcher watcher) {
+        if (watcher == null)
+	    throw new NullPointerException("Watcher can not be null");
+        rl.lock();
+        try {
+            if (removed) return false;
+            if (watcher.addTemplateHandle(this)) {
+                return watchers.add(watcher);
+            }
+            return false;
+        } finally {
+            rl.unlock();
+        }
+    }
+
+    /**
+     * Remote a watcher from this handle. Does nothing 
+     * if the specified watcher is not associated with
+     * this <code>TemplateHandle</code>.
+     * @param watcher the watcher to be removed.
+     * @throws NullPointerException if watcher is <code>null</code>.
+     */
+    void removeTransitionWatcher(TransitionWatcher watcher) {
+	if (watcher == null)
+	    throw new NullPointerException("Watcher can not be null");
+        rl.lock();
+        try {
+            watchers.remove(watcher);
+        } finally {
+            rl.unlock();
+        }
+    }
+
+    /**
+     * Iterate over the watchers associated with 
+     * this handle calling <code>isInterested</code> on each
+     * and if it returns <code>true</code> adding the watcher to the
+     * passed set.
+     *
+     * @param set The set to accumulate interested watchers
+     *            into.
+     * @param transition The transition being processed.
+     * @param ordinal The ordinal associated with <code>transition</code>.
+     * @throws NullPointerException if either argument is <code>null</code>.
+     */
+    void collectInterested(Set<TransitionWatcher> set, EntryTransition transition,
+					long ordinal) 
+    {
+        rl.lock();
+        try {
+            if (removed) return;
+            final Iterator i = watchers.iterator();
+            while (i.hasNext()) {
+                final TransitionWatcher w = (TransitionWatcher)i.next();
+                if (w.isInterested(transition, ordinal)) {
+                    set.add(w);
+                }
+            }
+        } finally {
+            rl.unlock();
+        }
+    }
+
+    /**
+     * Return the <code>OutriggerServerImpl</code> this 
+     * handle is part of.
+     * @return The <code>OutriggerServerImpl</code> this 
+     * handle is part of.
+     */
+    OutriggerServerImpl getServer() {
+	return owner;
+    }
+
+    /**
+     * Visit each <code>TransitionWatcher</code> and check to see if
+     * it has expired, removing it if it has.
+     * @param now an estimate of the current time expressed as
+     *            milliseconds since the beginning of the epoch.
+     */
+    void reap(long now) {
+        rl.lock();
+        try{
+            Iterator<TransitionWatcher> it = watchers.iterator();
+            while (it.hasNext()){
+                it.next().removeIfExpired(now);
+            }
+        } finally {
+            rl.unlock();
+        }
+    }
+
+    
+    /**
+     * Need to lock on the wl so no one will
+     * add a watcher between the check for empty and
+     * when it gets removed.
+     */
+    boolean removeIfEmpty(){
+        wl.lock();
+        try {
+            if (watchers.isEmpty()) {
+                return remove();
+            }
+            return false;
+        } finally {
+            wl.unlock();
+        }
+    }
+
+    @Override
+    public boolean removed() {
+        rl.lock();
+        try {
+            return removed;
+        } finally {
+            rl.unlock();
+        }
+    }
+
+    @Override
+    public boolean remove() {
+        wl.lock();
+        try {
+            if (removed){
+                return false; // already removed.
+            } else {
+                removed = super.remove();
+                return removed;
+            }
+        } finally {
+            wl.unlock();
+        }
+    }
+}

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java Sun Jul  5 11:41:39 2020
@@ -1,323 +1,326 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.WeakHashMap;
-import net.jini.core.transaction.TransactionException;
-import net.jini.id.Uuid;
-import net.jini.space.InternalSpaceException;
-
-/**
- * Subclass of <code>QueryWatcher</code> for and transactional
- * <code>readIfExists</code> queries.  Resolves with the first
- * matching transition where the entry is visible to the associated
- * transaction and the entry is still available, or of the locked
- * entry set goes empty.
- */
-class TransactableReadIfExistsWatcher extends SingletonQueryWatcher 
-    implements IfExistsWatcher, Transactable
-{
-    /**
-     * The set of entries that would match but are currently
-     * unavailable (e.g. they are locked). We only keep
-     * the ids, not the entries themselves.
-     */
-    private final Set<Uuid> lockedEntries;
-
-    /**
-     * Set <code>true</code> once the query thread is 
-     * done processing the backlog. Once this is 
-     * <code>true</code> it is ok to resolve if
-     * <code>lockedEntries</code> is empty.
-     */
-    private boolean backlogFinished = false;
-
-    /**
-     * The transaction this query is
-     * being performed under. 
-     */
-    private final Txn txn;
-
-    /**
-     * Set of entries (represented by <code>EntryHolder</code>s) that
-     * we would have liked to return, but have been provisionally
-     * removed.
-     */
-    private final Set<EntryHandle> provisionallyRemovedEntrySet;
-
-    /**
-     * Create a new <code>TransactableReadIfExistsWatcher</code>.
-     * @param expiration the initial expiration time
-     *        for this <code>TransitionWatcher</code> in 
-     *        milliseconds since the beginning of the epoch.
-     * @param timestamp the value that is used
-     *        to sort <code>TransitionWatcher</code>s.
-     * @param startOrdinal the highest ordinal associated
-     *        with operations that are considered to have occurred 
-     *        before the operation associated with this watcher.
-     * @param lockedEntries Set of entries (by their IDs)
-     *        that match but are unavailable. Must be non-empty.
-     *        Keeps a reference to this object.
-     * @param provisionallyRemovedEntrySet If the watcher encounters
-     *        an entry that can not be read/taken because it has been
-     *        provisionally removed then its handle will be placed in
-     *        this <code>WeakHashMap</code> as a key (with null as the
-     *        value).  May be <code>null</code> in which case
-     *        provisionally removed entries will not be
-     *        recorded. Ensures that object is only accessed by one
-     *        thread at a time
-     * @param txn If the query is being performed under
-     *        a transaction the <code>Txn</code> object
-     *        associated with that transaction.
-     * @throws NullPointerException if <code>lockedEntries</code> or 
-     *         <code>txn</code> is <code>null</code>.
-     */
-    TransactableReadIfExistsWatcher(long expiration, long timestamp, 
-	 long startOrdinal, Set<Uuid> lockedEntries, 
-         Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
-    {
-	super(expiration, timestamp, startOrdinal);
-
-	if (lockedEntries == null) 
-	    throw new NullPointerException("lockedEntries must be non-null");
-
-	if (txn == null)
-	    throw new NullPointerException("txn must be non-null");
-
-	this.lockedEntries = lockedEntries;
-	this.txn = txn;
-	this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; 
-    }
-
-    boolean isInterested(EntryTransition transition, long ordinal) {
-	/* If we are unresolved pretty much all transitions are
-	 * interesting because we may need to update
-	 * lockedEntries. The only exception is read locks being
-	 * resolved. It is important that transitions triggered by the
-	 * release of read locks get filtered out - otherwise process
-	 * could end up adding and removing elements to lockedEntries
-	 * when it shouldn't.
-	 * 
-	 * Note, !isResolved() without the lock will result only in
-	 * false positives, not false negatives - it will only
-	 * cause isInterested() to return false if we are resolved,
-	 * we may still return true if we are resolved though.  
-	 */
-	if (!transition.isVisible() && transition.isAvailable()) {
-	    /* must be a transition triggered by a read lock release,
-	     * wont change anything so ignore it.
-	     */
-	    return false;
-	}
-
-	return (ordinal>startOrdinal) && !isResolved();
-    }
-
-    synchronized void process(EntryTransition transition, long now) {
-	if (isResolved())
-	    return; // Already done.
-
-	final EntryHandle handle = transition.getHandle();
-	final EntryRep rep = handle.rep();
-	final boolean isVisible = transition.isVisible();
-	final TransactableMgr transitionTxn = transition.getTxn();
-
-	/* If it at one time it was available to our transaction
-	 * it may still be, try to get it.
-	 */
-	if (isVisible &&
-	    ((null == transitionTxn) || txn == transitionTxn)) {
-	    /* Is it still available? */
-	    if (getServer().attemptCapture(handle, txn, false, null, 
-		provisionallyRemovedEntrySet, now, this)) 
-	    {
-		// Got it
-		resolve(handle, null);
-	    } else {
-		/* Must not have been able to get it. Either
-		 * locked under a conflicting lock, in which
-		 * case it should be in our lockedEntries set, or
-		 * it has been removed, in which it still needs
-		 * to be in our lockedEntries since it may have
-		 * been replaced before being removed.
-		 */
-		lockedEntries.add(rep.id());		 
-	    }
-	} else if (isVisible) { // but it is not visible to txn
-	    /* If we are here then at one time it must have been was
-	     * visible but not visible to us, implying that it was an
-	     * entry written under a transaction and is interesting to
-	     * us, but not yet visible. We need to add it lockedEntries
-	     * even if has been removed since it could have gotten
-	     * replaced before it was removed. If we did not
-	     * add it we would be acting on the future.
-	     */
-	    lockedEntries.add(rep.id());
-	} else {
-	    /* Must not be available, transition must mark
-	     * the resolution of the transaction in such away
-	     * that the entry has been removed, remove it 
-	     * from the set and see if that makes the set empty.
-	     */
-	    lockedEntries.remove(rep.id());
-	    if (backlogFinished && lockedEntries.isEmpty())
-		resolve(null, null);
-	}
-    }
-
-    synchronized boolean catchUp(EntryTransition transition, long now) {
-	if (isResolved())
-	    return true; // Already done.
-
-	final EntryHandle handle = transition.getHandle();
-	final EntryRep rep = handle.rep();
-	final boolean isVisible = transition.isVisible();
-	final TransactableMgr transitionTxn = transition.getTxn();
-
-	/* Was this the resolution of a read lock? if so ignore */
-	if (!isVisible && transition.isAvailable())
-	    return false;
-
-	/* If it at one time it was available to our transaction
-	 * it may still be, try to get it.
-	 */
-	if (isVisible &&
-	    ((null == transitionTxn) || txn == transitionTxn)) {
-	    /* Is it still visible? Try to get it. attemptCapture will
-	     * add the entry to lockedEntries for us if we could not
-	     * get it and it is still in the space (but locked).
-	     * Nothing will be added if it has been removed outright. 
-	     * This is ok even though we are peaking into the future -
-	     * we won't act on that information until the future
-	     * comes to pass.
-	     */
-	    if (getServer().attemptCapture(handle, txn, false,
-		lockedEntries, provisionallyRemovedEntrySet, now, this)) 
-	    {
-		// Got it
-		resolve(handle, null);
-		return true;
-	    }
-
-	    // did not resolve
-	    return false;
-	}
-
-	if (isVisible) { // but it is not visible to txn
-	    /* If we are here then at one time it must have been was
-	     * visible but not visible to us, implying that it was
-	     * an entry written under a transaction and is interesting
-	     * to us, but not yet visible. We only add if it has not
-	     * already been removed.  It might have gotten replaced
-	     * before removal, but since we won't let this query get
-	     * resolved with a definitive null before we get past the
-	     * point in the journal where it was removed it is ok to
-	     * never put it in (and if we did put it in it might never
-	     * get removed since process() may have already processed
-	     * the removal). We don't need to check to see it the
-	     * entry has been provisionally removed since if has been
-	     * provisional removal does not put in entries in the
-	     * journal and if it is provisionally removed it has not
-	     * yet been removed so the remove recored has not yet been
-	     * created (must less processed).
-	     */
-	    synchronized (handle) {
-		if (!handle.removed()) {
-		    lockedEntries.add(rep.id());
-		}
-
-		/* If it has been removed, there is no way it
-		 * will be interesting to us again, ever.
-		 */
-	    }
-	    // Either way, still not resolved.
-	    return false;
-	}
-
-	/* Must not be visible (and because of the first test can't be
-	 * available either - that is transition can't just be the
-	 * release of a read lock), transition must mark the
-	 * resolution of the transaction in such away that the entry
-	 * has been removed, remove it from the set (don't need to
-	 * check for empty because we haven't gotten to the point
-	 * where we can resolve with a definitive null.)  
-	 */
-	lockedEntries.remove(rep.id());
-	return false;
-    }
-
-    /**
-     * Once the backlog is complete we can resolve if 
-     * lockedEntries is/becomes empty.
-     */
-    public synchronized void caughtUp() {
-	backlogFinished = true;
-	
-	if (isResolved())
-	    return; // Don't much mater.
-
-	if (lockedEntries.isEmpty())
-	    resolve(null, null);	
-    }
-
-    public synchronized boolean isLockedEntrySetEmpty() {
-	if (!isResolved())
-	    throw new IllegalStateException("Query not yet resolved");	
-	return lockedEntries.isEmpty();
-    }
-
-    /**
-     * If a transaction ends in the middle of a query we want
-     * to throw an exception to the client making the query
-     * not the <code>Txn</code> calling us here.)
-     */
-    public synchronized int prepare(TransactableMgr mgr,
-				    OutriggerServerImpl space) 
-    {
-	// only throw an exception if we are not resolved.
-	if (!isResolved()) {
-	    // Query still in progress, kill it
-	    resolve(null, new TransactionException("completed while " +
-						   "operation in progress"));
-	}
-
-	// If this object has made changes they have been recorded elsewhere
-	return NOTCHANGED;
-    }
-
-    /**
-     * This should never happen since we always return
-     * <code>NOTCHANGED</code> from <code>prepare</code>.
-     */
-    public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
-	throw new InternalSpaceException("committing a blocking query");
-			   
-    }
-
-    /**
-     * If a transaction ends in the middle of a query we want
-     * to throw an exception to the client making the query 
-     * (not the <code>Txn</code> calling us here.)
-     */
-    public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
-	// prepare does the right thing, and should forever
-	prepare(mgr, space);
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import net.jini.core.transaction.TransactionException;
+import net.jini.id.Uuid;
+import net.jini.space.InternalSpaceException;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+
+/**
+ * Subclass of <code>QueryWatcher</code> for and transactional
+ * <code>readIfExists</code> queries.  Resolves with the first
+ * matching transition where the entry is visible to the associated
+ * transaction and the entry is still available, or of the locked
+ * entry set goes empty.
+ */
+class TransactableReadIfExistsWatcher extends SingletonQueryWatcher 
+    implements IfExistsWatcher, Transactable
+{
+    /**
+     * The set of entries that would match but are currently
+     * unavailable (e.g. they are locked). We only keep
+     * the ids, not the entries themselves.
+     */
+    private final Set<Uuid> lockedEntries;
+
+    /**
+     * Set <code>true</code> once the query thread is 
+     * done processing the backlog. Once this is 
+     * <code>true</code> it is ok to resolve if
+     * <code>lockedEntries</code> is empty.
+     */
+    private boolean backlogFinished = false;
+
+    /**
+     * The transaction this query is
+     * being performed under. 
+     */
+    private final Txn txn;
+
+    /**
+     * Set of entries (represented by <code>EntryHolder</code>s) that
+     * we would have liked to return, but have been provisionally
+     * removed.
+     */
+    private final Set<EntryHandle> provisionallyRemovedEntrySet;
+
+    /**
+     * Create a new <code>TransactableReadIfExistsWatcher</code>.
+     * @param expiration the initial expiration time
+     *        for this <code>TransitionWatcher</code> in 
+     *        milliseconds since the beginning of the epoch.
+     * @param timestamp the value that is used
+     *        to sort <code>TransitionWatcher</code>s.
+     * @param startOrdinal the highest ordinal associated
+     *        with operations that are considered to have occurred 
+     *        before the operation associated with this watcher.
+     * @param lockedEntries Set of entries (by their IDs)
+     *        that match but are unavailable. Must be non-empty.
+     *        Keeps a reference to this object.
+     * @param provisionallyRemovedEntrySet If the watcher encounters
+     *        an entry that can not be read/taken because it has been
+     *        provisionally removed then its handle will be placed in
+     *        this <code>WeakHashMap</code> as a key (with null as the
+     *        value).  May be <code>null</code> in which case
+     *        provisionally removed entries will not be
+     *        recorded. Ensures that object is only accessed by one
+     *        thread at a time
+     * @param txn If the query is being performed under
+     *        a transaction the <code>Txn</code> object
+     *        associated with that transaction.
+     * @throws NullPointerException if <code>lockedEntries</code> or 
+     *         <code>txn</code> is <code>null</code>.
+     */
+    TransactableReadIfExistsWatcher(long expiration, long timestamp, 
+	 long startOrdinal, Set<Uuid> lockedEntries, 
+         Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
+    {
+	super(expiration, timestamp, startOrdinal);
+
+	if (lockedEntries == null) 
+	    throw new NullPointerException("lockedEntries must be non-null");
+
+	if (txn == null)
+	    throw new NullPointerException("txn must be non-null");
+
+	this.lockedEntries = lockedEntries;
+	this.txn = txn;
+	this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; 
+    }
+
+    boolean isInterested(EntryTransition transition, long ordinal) {
+	/* If we are unresolved pretty much all transitions are
+	 * interesting because we may need to update
+	 * lockedEntries. The only exception is read locks being
+	 * resolved. It is important that transitions triggered by the
+	 * release of read locks get filtered out - otherwise process
+	 * could end up adding and removing elements to lockedEntries
+	 * when it shouldn't.
+	 * 
+	 * Note, !isResolved() without the lock will result only in
+	 * false positives, not false negatives - it will only
+	 * cause isInterested() to return false if we are resolved,
+	 * we may still return true if we are resolved though.  
+	 */
+	if (!transition.isVisible() && transition.isAvailable()) {
+	    /* must be a transition triggered by a read lock release,
+	     * wont change anything so ignore it.
+	     */
+	    return false;
+	}
+
+	return (ordinal>startOrdinal) && !isResolved();
+    }
+
+    synchronized void process(EntryTransition transition, long now) {
+	if (isResolved())
+	    return; // Already done.
+
+	final EntryHandle handle = transition.getHandle();
+	final EntryRep rep = handle.rep();
+	final boolean isVisible = transition.isVisible();
+	final TransactableMgr transitionTxn = transition.getTxn();
+
+	/* If it at one time it was available to our transaction
+	 * it may still be, try to get it.
+	 */
+	if (isVisible &&
+	    ((null == transitionTxn) || txn == transitionTxn)) {
+	    /* Is it still available? */
+	    if (getServer().attemptCapture(handle, txn, false, null, 
+		provisionallyRemovedEntrySet, now, this)) 
+	    {
+		// Got it
+		resolve(handle, null);
+	    } else {
+		/* Must not have been able to get it. Either
+		 * locked under a conflicting lock, in which
+		 * case it should be in our lockedEntries set, or
+		 * it has been removed, in which it still needs
+		 * to be in our lockedEntries since it may have
+		 * been replaced before being removed.
+		 */
+		lockedEntries.add(rep.id());		 
+	    }
+	} else if (isVisible) { // but it is not visible to txn
+	    /* If we are here then at one time it must have been was
+	     * visible but not visible to us, implying that it was an
+	     * entry written under a transaction and is interesting to
+	     * us, but not yet visible. We need to add it lockedEntries
+	     * even if has been removed since it could have gotten
+	     * replaced before it was removed. If we did not
+	     * add it we would be acting on the future.
+	     */
+	    lockedEntries.add(rep.id());
+	} else {
+	    /* Must not be available, transition must mark
+	     * the resolution of the transaction in such away
+	     * that the entry has been removed, remove it 
+	     * from the set and see if that makes the set empty.
+	     */
+	    lockedEntries.remove(rep.id());
+	    if (backlogFinished && lockedEntries.isEmpty())
+		resolve(null, null);
+	}
+    }
+
+    synchronized boolean catchUp(EntryTransition transition, long now) {
+	if (isResolved())
+	    return true; // Already done.
+
+	final EntryHandle handle = transition.getHandle();
+	final EntryRep rep = handle.rep();
+	final boolean isVisible = transition.isVisible();
+	final TransactableMgr transitionTxn = transition.getTxn();
+
+	/* Was this the resolution of a read lock? if so ignore */
+	if (!isVisible && transition.isAvailable())
+	    return false;
+
+	/* If it at one time it was available to our transaction
+	 * it may still be, try to get it.
+	 */
+	if (isVisible &&
+	    ((null == transitionTxn) || txn == transitionTxn)) {
+	    /* Is it still visible? Try to get it. attemptCapture will
+	     * add the entry to lockedEntries for us if we could not
+	     * get it and it is still in the space (but locked).
+	     * Nothing will be added if it has been removed outright. 
+	     * This is ok even though we are peaking into the future -
+	     * we won't act on that information until the future
+	     * comes to pass.
+	     */
+	    if (getServer().attemptCapture(handle, txn, false,
+		lockedEntries, provisionallyRemovedEntrySet, now, this)) 
+	    {
+		// Got it
+		resolve(handle, null);
+		return true;
+	    }
+
+	    // did not resolve
+	    return false;
+	}
+
+	if (isVisible) { // but it is not visible to txn
+	    /* If we are here then at one time it must have been was
+	     * visible but not visible to us, implying that it was
+	     * an entry written under a transaction and is interesting
+	     * to us, but not yet visible. We only add if it has not
+	     * already been removed.  It might have gotten replaced
+	     * before removal, but since we won't let this query get
+	     * resolved with a definitive null before we get past the
+	     * point in the journal where it was removed it is ok to
+	     * never put it in (and if we did put it in it might never
+	     * get removed since process() may have already processed
+	     * the removal). We don't need to check to see it the
+	     * entry has been provisionally removed since if has been
+	     * provisional removal does not put in entries in the
+	     * journal and if it is provisionally removed it has not
+	     * yet been removed so the remove recored has not yet been
+	     * created (must less processed).
+	     */
+	    synchronized (handle) {
+		if (!handle.removed()) {
+		    lockedEntries.add(rep.id());
+		}
+
+		/* If it has been removed, there is no way it
+		 * will be interesting to us again, ever.
+		 */
+	    }
+	    // Either way, still not resolved.
+	    return false;
+	}
+
+	/* Must not be visible (and because of the first test can't be
+	 * available either - that is transition can't just be the
+	 * release of a read lock), transition must mark the
+	 * resolution of the transaction in such away that the entry
+	 * has been removed, remove it from the set (don't need to
+	 * check for empty because we haven't gotten to the point
+	 * where we can resolve with a definitive null.)  
+	 */
+	lockedEntries.remove(rep.id());
+	return false;
+    }
+
+    /**
+     * Once the backlog is complete we can resolve if 
+     * lockedEntries is/becomes empty.
+     */
+    public synchronized void caughtUp() {
+	backlogFinished = true;
+	
+	if (isResolved())
+	    return; // Don't much mater.
+
+	if (lockedEntries.isEmpty())
+	    resolve(null, null);	
+    }
+
+    public synchronized boolean isLockedEntrySetEmpty() {
+	if (!isResolved())
+	    throw new IllegalStateException("Query not yet resolved");	
+	return lockedEntries.isEmpty();
+    }
+
+    /**
+     * If a transaction ends in the middle of a query we want
+     * to throw an exception to the client making the query
+     * not the <code>Txn</code> calling us here.)
+     */
+    public synchronized int prepare(TransactableMgr mgr,
+				    OutriggerServerImpl space) 
+    {
+	// only throw an exception if we are not resolved.
+	if (!isResolved()) {
+	    // Query still in progress, kill it
+	    resolve(null, new TransactionException("completed while " +
+						   "operation in progress"));
+	}
+
+	// If this object has made changes they have been recorded elsewhere
+	return NOTCHANGED;
+    }
+
+    /**
+     * This should never happen since we always return
+     * <code>NOTCHANGED</code> from <code>prepare</code>.
+     */
+    public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
+	throw new InternalSpaceException("committing a blocking query");
+			   
+    }
+
+    /**
+     * If a transaction ends in the middle of a query we want
+     * to throw an exception to the client making the query 
+     * (not the <code>Txn</code> calling us here.)
+     */
+    public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
+	// prepare does the right thing, and should forever
+	prepare(mgr, space);
+    }
+}

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java Sun Jul  5 11:41:39 2020
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.io.IOException;
-import javax.security.auth.login.LoginException;
-import net.jini.config.ConfigurationException;
-import org.apache.river.start.LifeCycle;
-
-/**
- * <code>OutriggerServerWrapper</code> subclass for
- * transient servers.
- *
- * @author Sun Microsystems, Inc.
- * @since 2.0
- */
-class TransientOutriggerImpl extends OutriggerServerWrapper {
-    /**
-     * Create a new transient outrigger server.
-     * @param configArgs set of strings to be used to obtain a
-     *                   <code>Configuration</code>.
-     * @param lifeCycle the object to notify when this
-     *                  service is destroyed.
-     * @throws IOException if there is problem exporting the server.
-     * @throws ConfigurationException if the <code>Configuration</code> is 
-     *         malformed.
-     * @throws LoginException if the <code>loginContext</code> specified
-     *         in the configuration is non-null and throws 
-     *         an exception when login is attempted.
-     */
-    TransientOutriggerImpl(String[] configArgs, LifeCycle lifeCycle)
-	throws IOException, ConfigurationException, LoginException
-    {
-	super(configArgs, lifeCycle, false);
-	allowCalls();
-    }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.io.IOException;
+import javax.security.auth.login.LoginException;
+import net.jini.config.ConfigurationException;
+import org.apache.river.start.lifecycle.LifeCycle;
+
+/**
+ * <code>OutriggerServerWrapper</code> subclass for
+ * transient servers.
+ *
+ * @author Sun Microsystems, Inc.
+ * @since 2.0
+ */
+class TransientOutriggerImpl extends OutriggerServerWrapper {
+    /**
+     * Create a new transient outrigger server.
+     * @param configArgs set of strings to be used to obtain a
+     *                   <code>Configuration</code>.
+     * @param lifeCycle the object to notify when this
+     *                  service is destroyed.
+     * @throws IOException if there is problem exporting the server.
+     * @throws ConfigurationException if the <code>Configuration</code> is 
+     *         malformed.
+     * @throws LoginException if the <code>loginContext</code> specified
+     *         in the configuration is non-null and throws 
+     *         an exception when login is attempted.
+     */
+    TransientOutriggerImpl(String[] configArgs, LifeCycle lifeCycle)
+	throws IOException, ConfigurationException, LoginException
+    {
+	super(configArgs, lifeCycle, false);
+	allowCalls();
+    }
+}
+

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java Sun Jul  5 11:41:39 2020
@@ -1,183 +1,185 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.Map;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Given an <code>EntryHandle</code> who's entry is making a
- * visibility transition this class will find all the 
- * <code>TransitionWatcher</code>s who are interested in that
- * transition. The <code>TransitionWatcher</code>s are organized
- * into groups using <code>TemplateHandle</code>. Each
- * <code>TemplateHandle</code> aggregates a number of watchers
- * all interested in the same template.
- * 
- * @see TransitionWatcher
- * @author Sun Microsystems, Inc.
- */
-class TransitionWatchers {
-    /** 
-     * A map from class names to <code>WatchersForTemplateClass</code>
-     * objects 
-     */
-    final private ConcurrentMap<String,WatchersForTemplateClass> holders 
-            = new ConcurrentHashMap<String,WatchersForTemplateClass>();
-
-    /** The server we are working for */
-    final private OutriggerServerImpl server;
-
-    /**
-     * Create a new <code>TransitionWatchers</code> object
-     * for the specified server.
-     * @param server The server the new <code>TransitionWatchers</code> 
-     *               object is working for.
-     * @throws NullPointerException if <code>server</code> is 
-     *        <code>null</code>
-     */
-    TransitionWatchers(OutriggerServerImpl server) {
-	this(check(server), server);
-    }
-    
-    private static boolean check(OutriggerServerImpl server) throws NullPointerException {
-        if (server == null)
-	    throw new NullPointerException("server must be non-null");
-        return true;
-    }
-
-    private TransitionWatchers(boolean checked, OutriggerServerImpl server){
-        this.server = server;
-    }
-    /**
-     * Add a <code>TransitionWatcher</code> to the list
-     * of watchers looking for visibility transitions in
-     * entries that match the specified template. Associates
-     * a <code>TemplateHandle</code> using 
-     * <code>TransitionWatcher.setTemplateHandle</code> method.
-     * <p>
-     * This method is thread safe. The watcher added in this call is
-     * guaranteed to be consulted by the next call to
-     * <code>allMatches</code> that starts after this call completes even
-     * if that call is made from another thread. Also, all of
-     * of the assigned values in the calling thread's working
-     * memory will be copied out to main memory as part of the 
-     * process of making the passed watcher visible to future
-     * <code>allMatches</code> and <code>findTransitionWatcher</code> calls.
-     *
-     * @param watcher The <code>TransitionWatcher</code> being added.
-     * @param template The <code>EntryRep</code> that represents
-     *                 the template of interest.
-     * @throws NullPointerException if either argument is
-     *         <code>null</code>.  
-     */
-    void add(TransitionWatcher watcher, EntryRep template) {
-	// Get/create the appropriate WatchersForTemplateClass
-	final String className = template.classFor();
-	WatchersForTemplateClass holder = holders.get(className);	    
-        if (holder == null) {
-            holder = new WatchersForTemplateClass(server);
-            WatchersForTemplateClass existed = holders.putIfAbsent(className, holder);
-            if (existed != null) holder = existed;
-        }
-	// Add the watcher to the WatchersForTemplateClass
-	holder.add(watcher, template);
-    }
-
-    /**
-     * Return a <code>SortedSet</code> of all the
-     * <code>TransitionWatcher</code> who's <code>isInterested</code>
-     * methods return <code>true</code> when asked about the specified
-     * visibility transition.
-     * <p>
-     * This method is thread safe. This call is guaranteed to check unremoved
-     * watchers that were added by <code>add</code> calls that completed
-     * before this call started, even if the calls were made from
-     * different threads. Before the <code>isInterested</code> method
-     * of the first watcher is called the working memory of this thread
-     * will be flushed so any changes made to main memory before
-     * this call started will be visible.
-     * 
-     * @param transition A <code>EntryTransition</code> that
-     *              describes the transition and what
-     *              entry is transitioning. This method
-     *              will assume that <code>transition.getHandle</code>
-     *              returns a non-null value.
-     * @param ordinal The ordinal associated with <code>transition</code>.
-     * @return A new <code>SortedSet</code> of all the 
-     *         <code>TransitionWatcher</code>s interested in the specified
-     *         visibility transition. If none are interested an empty
-     *         map will be returned.
-     * @throws NullPointerException if <code>transition</code> is 
-     *         <code>null</code>.
-     */
-    SortedSet<TransitionWatcher> allMatches(EntryTransition transition, long ordinal) {
-	final EntryRep rep = transition.getHandle().rep();
-	final SortedSet<TransitionWatcher> rslt = new java.util.TreeSet<TransitionWatcher>();
-	final String className = rep.classFor();
-	WatchersForTemplateClass holder;
-	
-	/* Collect all the watchers looking for the exact class of the
-	 * transitioned entry. 
-	 */
-        holder = holders.get(className);
-
-	if (holder != null) holder.collectInterested(rslt, transition, ordinal);
-
-	// Get all the templates that are super classes of className
-	final String[] superclasses = rep.superclasses();
-	for (int i=0; i<superclasses.length; i++) {	 
-            holder = holders.get(superclasses[i]);
-	    if (holder != null)
-		holder.collectInterested(rslt, transition, ordinal);
-	}
-
-	// Including those registered for the null template
-	final String nullClass = EntryRep.matchAnyEntryRep().classFor();
-        holder = holders.get(nullClass);
-	if (holder!=null) holder.collectInterested(rslt, transition, ordinal);
-	return rslt;
-    }
-
-    /**
-     * Visit each <code>TransitionWatcher</code> and check to see if
-     * it has expired, removing it if it has. 
-     */
-    void reap() {
-	final long now = System.currentTimeMillis();
-        Iterator<WatchersForTemplateClass> watchers = holders.values().iterator();
-        while (watchers.hasNext()){
-            watchers.next().reap(now);
-        }
-    }
-
-    /**
-     * Return the <code>OutriggerServerImpl</code> this 
-     * <code>TransitionWatchers</code> object is part of.
-     * @return The <code>OutriggerServerImpl</code> this 
-     * <code>TransitionWatchers</code> is part of.
-     */
-    OutriggerServerImpl getServer() {
-	return server;
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+/**
+ * Given an <code>EntryHandle</code> who's entry is making a
+ * visibility transition this class will find all the 
+ * <code>TransitionWatcher</code>s who are interested in that
+ * transition. The <code>TransitionWatcher</code>s are organized
+ * into groups using <code>TemplateHandle</code>. Each
+ * <code>TemplateHandle</code> aggregates a number of watchers
+ * all interested in the same template.
+ * 
+ * @see TransitionWatcher
+ * @author Sun Microsystems, Inc.
+ */
+class TransitionWatchers {
+    /** 
+     * A map from class names to <code>WatchersForTemplateClass</code>
+     * objects 
+     */
+    final private ConcurrentMap<String,WatchersForTemplateClass> holders 
+            = new ConcurrentHashMap<String,WatchersForTemplateClass>();
+
+    /** The server we are working for */
+    final private OutriggerServerImpl server;
+
+    /**
+     * Create a new <code>TransitionWatchers</code> object
+     * for the specified server.
+     * @param server The server the new <code>TransitionWatchers</code> 
+     *               object is working for.
+     * @throws NullPointerException if <code>server</code> is 
+     *        <code>null</code>
+     */
+    TransitionWatchers(OutriggerServerImpl server) {
+	this(check(server), server);
+    }
+    
+    private static boolean check(OutriggerServerImpl server) throws NullPointerException {
+        if (server == null)
+	    throw new NullPointerException("server must be non-null");
+        return true;
+    }
+
+    private TransitionWatchers(boolean checked, OutriggerServerImpl server){
+        this.server = server;
+    }
+    /**
+     * Add a <code>TransitionWatcher</code> to the list
+     * of watchers looking for visibility transitions in
+     * entries that match the specified template. Associates
+     * a <code>TemplateHandle</code> using 
+     * <code>TransitionWatcher.setTemplateHandle</code> method.
+     * <p>
+     * This method is thread safe. The watcher added in this call is
+     * guaranteed to be consulted by the next call to
+     * <code>allMatches</code> that starts after this call completes even
+     * if that call is made from another thread. Also, all of
+     * of the assigned values in the calling thread's working
+     * memory will be copied out to main memory as part of the 
+     * process of making the passed watcher visible to future
+     * <code>allMatches</code> and <code>findTransitionWatcher</code> calls.
+     *
+     * @param watcher The <code>TransitionWatcher</code> being added.
+     * @param template The <code>EntryRep</code> that represents
+     *                 the template of interest.
+     * @throws NullPointerException if either argument is
+     *         <code>null</code>.  
+     */
+    void add(TransitionWatcher watcher, EntryRep template) {
+	// Get/create the appropriate WatchersForTemplateClass
+	final String className = template.classFor();
+	WatchersForTemplateClass holder = holders.get(className);	    
+        if (holder == null) {
+            holder = new WatchersForTemplateClass(server);
+            WatchersForTemplateClass existed = holders.putIfAbsent(className, holder);
+            if (existed != null) holder = existed;
+        }
+	// Add the watcher to the WatchersForTemplateClass
+	holder.add(watcher, template);
+    }
+
+    /**
+     * Return a <code>SortedSet</code> of all the
+     * <code>TransitionWatcher</code> who's <code>isInterested</code>
+     * methods return <code>true</code> when asked about the specified
+     * visibility transition.
+     * <p>
+     * This method is thread safe. This call is guaranteed to check unremoved
+     * watchers that were added by <code>add</code> calls that completed
+     * before this call started, even if the calls were made from
+     * different threads. Before the <code>isInterested</code> method
+     * of the first watcher is called the working memory of this thread
+     * will be flushed so any changes made to main memory before
+     * this call started will be visible.
+     * 
+     * @param transition A <code>EntryTransition</code> that
+     *              describes the transition and what
+     *              entry is transitioning. This method
+     *              will assume that <code>transition.getHandle</code>
+     *              returns a non-null value.
+     * @param ordinal The ordinal associated with <code>transition</code>.
+     * @return A new <code>SortedSet</code> of all the 
+     *         <code>TransitionWatcher</code>s interested in the specified
+     *         visibility transition. If none are interested an empty
+     *         map will be returned.
+     * @throws NullPointerException if <code>transition</code> is 
+     *         <code>null</code>.
+     */
+    SortedSet<TransitionWatcher> allMatches(EntryTransition transition, long ordinal) {
+	final EntryRep rep = transition.getHandle().rep();
+	final SortedSet<TransitionWatcher> rslt = new java.util.TreeSet<TransitionWatcher>();
+	final String className = rep.classFor();
+	WatchersForTemplateClass holder;
+	
+	/* Collect all the watchers looking for the exact class of the
+	 * transitioned entry. 
+	 */
+        holder = holders.get(className);
+
+	if (holder != null) holder.collectInterested(rslt, transition, ordinal);
+
+	// Get all the templates that are super classes of className
+	final String[] superclasses = rep.superclasses();
+	for (int i=0; i<superclasses.length; i++) {	 
+            holder = holders.get(superclasses[i]);
+	    if (holder != null)
+		holder.collectInterested(rslt, transition, ordinal);
+	}
+
+	// Including those registered for the null template
+	final String nullClass = EntryRep.matchAnyEntryRep().classFor();
+        holder = holders.get(nullClass);
+	if (holder!=null) holder.collectInterested(rslt, transition, ordinal);
+	return rslt;
+    }
+
+    /**
+     * Visit each <code>TransitionWatcher</code> and check to see if
+     * it has expired, removing it if it has. 
+     */
+    void reap() {
+	final long now = System.currentTimeMillis();
+        Iterator<WatchersForTemplateClass> watchers = holders.values().iterator();
+        while (watchers.hasNext()){
+            watchers.next().reap(now);
+        }
+    }
+
+    /**
+     * Return the <code>OutriggerServerImpl</code> this 
+     * <code>TransitionWatchers</code> object is part of.
+     * @return The <code>OutriggerServerImpl</code> this 
+     * <code>TransitionWatchers</code> is part of.
+     */
+    OutriggerServerImpl getServer() {
+	return server;
+    }
+
+}