You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2020/07/05 11:41:42 UTC
svn commit: r1879521 [30/37] - in
/river/jtsk/modules/modularize/apache-river: ./ browser/
browser/src/main/java/org/apache/river/example/browser/ extra/
groovy-config/ river-activation/ river-collections/
river-collections/src/main/java/org/apache/riv...
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java Sun Jul 5 11:41:39 2020
@@ -1,307 +1,310 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.WeakHashMap;
-import net.jini.core.transaction.TransactionException;
-import net.jini.id.Uuid;
-import net.jini.space.InternalSpaceException;
-
-/**
- * Subclass of <code>QueryWatcher</code> for <code>takeIfExists</code>
- * queries. Resolves with the first matching transition where the
- * entry is visible to the associated transaction and the entry is
- * still available, or of the locked entry set goes empty.
- */
-class TakeIfExistsWatcher extends SingletonQueryWatcher
- implements IfExistsWatcher, Transactable
-{
- /**
- * The set of entries that would match but are currently
- * unavailable (e.g. they are locked). We only keep
- * the ids, not the entries themselves.
- */
- private final Set<Uuid> lockedEntries;
-
- /**
- * Set <code>true</code> once the query thread is
- * done processing the backlog. Once this is
- * <code>true</code> it is ok to resolve if
- * <code>lockedEntries</code> is empty.
- */
- private boolean backlogFinished = false;
-
- /**
- * If non-null the transaction this query is
- * being performed under. If <code>null</code>
- * this query is not associated with a transaction.
- */
- private final Txn txn;
-
- /**
- * Set of entries (represented by <code>EntryHolder</code>s) that
- * we would have liked to return, but have been provisionally
- * removed.
- */
- private final Set<EntryHandle> provisionallyRemovedEntrySet;
-
- /**
- * Create a new <code>TakeIfExistsWatcher</code>.
- * @param expiration the initial expiration time
- * for this <code>TransitionWatcher</code> in
- * milliseconds since the beginning of the epoch.
- * @param timestamp the value that is used
- * to sort <code>TransitionWatcher</code>s.
- * @param startOrdinal the highest ordinal associated
- * with operations that are considered to have occurred
- * before the operation associated with this watcher.
- * @param lockedEntries Set of entries (by their IDs)
- * that match but are unavailable. Must be non-empty.
- * Keeps a reference to this object.
- * @param provisionallyRemovedEntrySet If the watcher encounters
- * an entry that can not be read/taken because it has been
- * provisionally removed then its handle will be placed in
- * this <code>WeakHashMap</code> as a key (with null as the
- * value). May be <code>null</code> in which case
- * provisionally removed entries will not be
- * recorded. Ensures that object is only accessed by one
- * thread at a time
- * @param txn If the query is being performed under
- * a transaction the <code>Txn</code> object
- * associated with that transaction.
- * @throws NullPointerException if <code>lockedEntries</code> is
- * <code>null</code>.
- */
- TakeIfExistsWatcher(long expiration, long timestamp,
- long startOrdinal, Set<Uuid> lockedEntries,
- Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
- {
- super(expiration, timestamp, startOrdinal);
-
- if (lockedEntries == null)
- throw new NullPointerException("lockedEntries must be non-null");
-
- this.lockedEntries = lockedEntries;
- this.txn = txn;
- this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet;
- }
-
- boolean isInterested(EntryTransition transition, long ordinal) {
- /* If we are unresolved pretty much all transitions are
- * interesting because we may need to update
- * lockedEntries.
- *
- * Note, !isResolved() without the lock will result only in
- * false positives, not false negatives - it will only
- * cause isInterested() to return false if we are resolved,
- * we may still return true if we are resolved though.
- */
- return (ordinal>startOrdinal) && !isResolved();
- }
-
- synchronized void process(EntryTransition transition, long now) {
- if (isResolved())
- return; // Already done.
-
- final EntryHandle handle = transition.getHandle();
- final EntryRep rep = handle.rep();
- final boolean isAvailable = transition.isAvailable();
- final TransactableMgr transitionTxn = transition.getTxn();
-
- /* If it at one time it was available to our transaction
- * it may still be, try to get it.
- */
- if (isAvailable &&
- ((null == transitionTxn) || txn == transitionTxn)) {
- /* Is it still available? */
- if (getServer().attemptCapture(handle, txn, true, null,
- provisionallyRemovedEntrySet, now, this))
- {
- // Got it
- resolve(handle, null);
- } else {
- /* Must not have been able to get it. Either
- * locked under a conflicting lock, in which
- * case it should be in our lockedEntries set, or
- * it has been removed, in which it still needs
- * to be in our lockedEntries since it may have
- * been replaced before being removed.
- */
- lockedEntries.add(rep.id());
- }
- } else if (isAvailable) { // but it is not visible to txn
- /* If we are here then at one time it must have been was
- * available but not visible to us, implying that it was an
- * entry written under a transaction and is interesting to
- * us, but not yet visible. We need to add it lockedEntries
- * even if has been removed since it could have gotten
- * replaced before it was removed. If we did not
- * add it we would be acting on the future.
- */
- lockedEntries.add(rep.id());
- } else {
- /* Must not be available, transition must mark
- * the resolution of the transaction in such away
- * that the entry has been removed, remove it
- * from the set and see if that makes the set empty.
- */
- lockedEntries.remove(rep.id());
- if (backlogFinished && lockedEntries.isEmpty())
- resolve(null, null);
- }
- }
-
- synchronized boolean catchUp(EntryTransition transition, long now) {
- if (isResolved())
- return true; // Already done.
-
- final EntryHandle handle = transition.getHandle();
- final EntryRep rep = handle.rep();
- final boolean isAvailable = transition.isAvailable();
- final TransactableMgr transitionTxn = transition.getTxn();
-
- /* If it at one time it was available to our transaction
- * it may still be, try to get it.
- */
- if (isAvailable &&
- ((null == transitionTxn) || txn == transitionTxn)) {
- /* Is it still available? Try to get it. attemptCapture will
- * add the entry to lockedEntries for us if we could not
- * get it and it is still in the space (but locked).
- * Nothing will be added if it has been removed outright.
- * This is ok even though we are peaking into the future -
- * we won't act on that information until the future
- * comes to pass.
- */
- if (getServer().attemptCapture(handle, txn, true,
- lockedEntries, provisionallyRemovedEntrySet, now, this))
- {
- // Got it
- resolve(handle, null);
- return true;
- }
-
- // did not resolve
- return false;
- }
-
- if (isAvailable) { // but it is not visible to txn
- /* If we are here then at one time it must have been was
- * available but not visible to us, implying that it was
- * an entry written under a transaction and is interesting
- * to us, but not yet visible. We only add if it has not
- * already been removed. It might have gotten replaced
- * before removal, but since we won't let this query get
- * resolved with a definitive null before we get past the
- * point in the journal where it was removed it is ok to
- * never put it in (and if we did put it in it might never
- * get removed since process() may have already processed
- * the removal). We don't need to check to see it the
- * entry has been provisionally removed since if has been
- * provisional removal does not put in entries in the
- * journal and if it is provisionally removed it has not
- * yet been removed so the remove recored has not yet been
- * created (must less processed).
- */
- synchronized (handle) {
- if (!handle.removed()) {
- lockedEntries.add(rep.id());
- }
-
- /* If it has been removed, there is no way it
- * will be interesting to us again, ever.
- */
- }
- // Either way, still not resolved.
- return false;
- }
-
- /* Must not be available, transition must mark
- * the resolution of the transaction in such away
- * that the entry has been removed, remove it
- * from the set (don't need to check for empty
- * because we haven't gotten to the point
- * where we can resolve with a definitive null.)
- */
- lockedEntries.remove(rep.id());
- return false;
- }
-
- /**
- * Once the backlog is complete we can resolve if
- * lockedEntries is/becomes empty.
- */
- public synchronized void caughtUp() {
- backlogFinished = true;
-
- if (isResolved())
- return; // Don't much mater.
-
- if (lockedEntries.isEmpty())
- resolve(null, null);
- }
-
- public synchronized boolean isLockedEntrySetEmpty() {
- if (!isResolved())
- throw new IllegalStateException("Query not yet resolved");
- return lockedEntries.isEmpty();
- }
-
- /**
- * If a transaction ends in the middle of a query we want
- * to throw an exception to the client making the query
- * not the <code>Txn</code> calling us here.)
- */
- public synchronized int prepare(TransactableMgr mgr,
- OutriggerServerImpl space)
- {
- assert txn != null:"Transactable method called on a " +
- "non-transactional TakeIfExistsWatcher";
-
- // only throw an exception if we are not resolved.
- if (!isResolved()) {
- // Query still in progress, kill it
- resolve(null, new TransactionException("completed while " +
- "operation in progress"));
- }
-
- // If this object has made changes they have been recorded elsewhere
- return NOTCHANGED;
- }
-
- /**
- * This should never happen since we always return
- * <code>NOTCHANGED</code> from <code>prepare</code>.
- */
- public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
- throw new InternalSpaceException("committing a blocking query");
-
- }
-
- /**
- * If a transaction ends in the middle of a query we want
- * to throw an exception to the client making the query
- * (not the <code>Txn</code> calling us here.)
- */
- public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
- // prepare does the right thing, and should forever
- prepare(mgr, space);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import net.jini.core.transaction.TransactionException;
+import net.jini.id.Uuid;
+import net.jini.space.InternalSpaceException;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+
+/**
+ * Subclass of <code>QueryWatcher</code> for <code>takeIfExists</code>
+ * queries. Resolves with the first matching transition where the
+ * entry is visible to the associated transaction and the entry is
+ * still available, or of the locked entry set goes empty.
+ */
+class TakeIfExistsWatcher extends SingletonQueryWatcher
+ implements IfExistsWatcher, Transactable
+{
+ /**
+ * The set of entries that would match but are currently
+ * unavailable (e.g. they are locked). We only keep
+ * the ids, not the entries themselves.
+ */
+ private final Set<Uuid> lockedEntries;
+
+ /**
+ * Set <code>true</code> once the query thread is
+ * done processing the backlog. Once this is
+ * <code>true</code> it is ok to resolve if
+ * <code>lockedEntries</code> is empty.
+ */
+ private boolean backlogFinished = false;
+
+ /**
+ * If non-null the transaction this query is
+ * being performed under. If <code>null</code>
+ * this query is not associated with a transaction.
+ */
+ private final Txn txn;
+
+ /**
+ * Set of entries (represented by <code>EntryHolder</code>s) that
+ * we would have liked to return, but have been provisionally
+ * removed.
+ */
+ private final Set<EntryHandle> provisionallyRemovedEntrySet;
+
+ /**
+ * Create a new <code>TakeIfExistsWatcher</code>.
+ * @param expiration the initial expiration time
+ * for this <code>TransitionWatcher</code> in
+ * milliseconds since the beginning of the epoch.
+ * @param timestamp the value that is used
+ * to sort <code>TransitionWatcher</code>s.
+ * @param startOrdinal the highest ordinal associated
+ * with operations that are considered to have occurred
+ * before the operation associated with this watcher.
+ * @param lockedEntries Set of entries (by their IDs)
+ * that match but are unavailable. Must be non-empty.
+ * Keeps a reference to this object.
+ * @param provisionallyRemovedEntrySet If the watcher encounters
+ * an entry that can not be read/taken because it has been
+ * provisionally removed then its handle will be placed in
+ * this <code>WeakHashMap</code> as a key (with null as the
+ * value). May be <code>null</code> in which case
+ * provisionally removed entries will not be
+ * recorded. Ensures that object is only accessed by one
+ * thread at a time
+ * @param txn If the query is being performed under
+ * a transaction the <code>Txn</code> object
+ * associated with that transaction.
+ * @throws NullPointerException if <code>lockedEntries</code> is
+ * <code>null</code>.
+ */
+ TakeIfExistsWatcher(long expiration, long timestamp,
+ long startOrdinal, Set<Uuid> lockedEntries,
+ Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
+ {
+ super(expiration, timestamp, startOrdinal);
+
+ if (lockedEntries == null)
+ throw new NullPointerException("lockedEntries must be non-null");
+
+ this.lockedEntries = lockedEntries;
+ this.txn = txn;
+ this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet;
+ }
+
+ boolean isInterested(EntryTransition transition, long ordinal) {
+ /* If we are unresolved pretty much all transitions are
+ * interesting because we may need to update
+ * lockedEntries.
+ *
+ * Note, !isResolved() without the lock will result only in
+ * false positives, not false negatives - it will only
+ * cause isInterested() to return false if we are resolved,
+ * we may still return true if we are resolved though.
+ */
+ return (ordinal>startOrdinal) && !isResolved();
+ }
+
+ synchronized void process(EntryTransition transition, long now) {
+ if (isResolved())
+ return; // Already done.
+
+ final EntryHandle handle = transition.getHandle();
+ final EntryRep rep = handle.rep();
+ final boolean isAvailable = transition.isAvailable();
+ final TransactableMgr transitionTxn = transition.getTxn();
+
+ /* If it at one time it was available to our transaction
+ * it may still be, try to get it.
+ */
+ if (isAvailable &&
+ ((null == transitionTxn) || txn == transitionTxn)) {
+ /* Is it still available? */
+ if (getServer().attemptCapture(handle, txn, true, null,
+ provisionallyRemovedEntrySet, now, this))
+ {
+ // Got it
+ resolve(handle, null);
+ } else {
+ /* Must not have been able to get it. Either
+ * locked under a conflicting lock, in which
+ * case it should be in our lockedEntries set, or
+ * it has been removed, in which it still needs
+ * to be in our lockedEntries since it may have
+ * been replaced before being removed.
+ */
+ lockedEntries.add(rep.id());
+ }
+ } else if (isAvailable) { // but it is not visible to txn
+ /* If we are here then at one time it must have been was
+ * available but not visible to us, implying that it was an
+ * entry written under a transaction and is interesting to
+ * us, but not yet visible. We need to add it lockedEntries
+ * even if has been removed since it could have gotten
+ * replaced before it was removed. If we did not
+ * add it we would be acting on the future.
+ */
+ lockedEntries.add(rep.id());
+ } else {
+ /* Must not be available, transition must mark
+ * the resolution of the transaction in such away
+ * that the entry has been removed, remove it
+ * from the set and see if that makes the set empty.
+ */
+ lockedEntries.remove(rep.id());
+ if (backlogFinished && lockedEntries.isEmpty())
+ resolve(null, null);
+ }
+ }
+
+ synchronized boolean catchUp(EntryTransition transition, long now) {
+ if (isResolved())
+ return true; // Already done.
+
+ final EntryHandle handle = transition.getHandle();
+ final EntryRep rep = handle.rep();
+ final boolean isAvailable = transition.isAvailable();
+ final TransactableMgr transitionTxn = transition.getTxn();
+
+ /* If it at one time it was available to our transaction
+ * it may still be, try to get it.
+ */
+ if (isAvailable &&
+ ((null == transitionTxn) || txn == transitionTxn)) {
+ /* Is it still available? Try to get it. attemptCapture will
+ * add the entry to lockedEntries for us if we could not
+ * get it and it is still in the space (but locked).
+ * Nothing will be added if it has been removed outright.
+ * This is ok even though we are peaking into the future -
+ * we won't act on that information until the future
+ * comes to pass.
+ */
+ if (getServer().attemptCapture(handle, txn, true,
+ lockedEntries, provisionallyRemovedEntrySet, now, this))
+ {
+ // Got it
+ resolve(handle, null);
+ return true;
+ }
+
+ // did not resolve
+ return false;
+ }
+
+ if (isAvailable) { // but it is not visible to txn
+ /* If we are here then at one time it must have been was
+ * available but not visible to us, implying that it was
+ * an entry written under a transaction and is interesting
+ * to us, but not yet visible. We only add if it has not
+ * already been removed. It might have gotten replaced
+ * before removal, but since we won't let this query get
+ * resolved with a definitive null before we get past the
+ * point in the journal where it was removed it is ok to
+ * never put it in (and if we did put it in it might never
+ * get removed since process() may have already processed
+ * the removal). We don't need to check to see it the
+ * entry has been provisionally removed since if has been
+ * provisional removal does not put in entries in the
+ * journal and if it is provisionally removed it has not
+ * yet been removed so the remove recored has not yet been
+ * created (must less processed).
+ */
+ synchronized (handle) {
+ if (!handle.removed()) {
+ lockedEntries.add(rep.id());
+ }
+
+ /* If it has been removed, there is no way it
+ * will be interesting to us again, ever.
+ */
+ }
+ // Either way, still not resolved.
+ return false;
+ }
+
+ /* Must not be available, transition must mark
+ * the resolution of the transaction in such away
+ * that the entry has been removed, remove it
+ * from the set (don't need to check for empty
+ * because we haven't gotten to the point
+ * where we can resolve with a definitive null.)
+ */
+ lockedEntries.remove(rep.id());
+ return false;
+ }
+
+ /**
+ * Once the backlog is complete we can resolve if
+ * lockedEntries is/becomes empty.
+ */
+ public synchronized void caughtUp() {
+ backlogFinished = true;
+
+ if (isResolved())
+ return; // Don't much mater.
+
+ if (lockedEntries.isEmpty())
+ resolve(null, null);
+ }
+
+ public synchronized boolean isLockedEntrySetEmpty() {
+ if (!isResolved())
+ throw new IllegalStateException("Query not yet resolved");
+ return lockedEntries.isEmpty();
+ }
+
+ /**
+ * If a transaction ends in the middle of a query we want
+ * to throw an exception to the client making the query
+ * not the <code>Txn</code> calling us here.)
+ */
+ public synchronized int prepare(TransactableMgr mgr,
+ OutriggerServerImpl space)
+ {
+ assert txn != null:"Transactable method called on a " +
+ "non-transactional TakeIfExistsWatcher";
+
+ // only throw an exception if we are not resolved.
+ if (!isResolved()) {
+ // Query still in progress, kill it
+ resolve(null, new TransactionException("completed while " +
+ "operation in progress"));
+ }
+
+ // If this object has made changes they have been recorded elsewhere
+ return NOTCHANGED;
+ }
+
+ /**
+ * This should never happen since we always return
+ * <code>NOTCHANGED</code> from <code>prepare</code>.
+ */
+ public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
+ throw new InternalSpaceException("committing a blocking query");
+
+ }
+
+ /**
+ * If a transaction ends in the middle of a query we want
+ * to throw an exception to the client making the query
+ * (not the <code>Txn</code> calling us here.)
+ */
+ public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
+ // prepare does the right thing, and should forever
+ prepare(mgr, space);
+ }
+}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java Sun Jul 5 11:41:39 2020
@@ -1,232 +1,234 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-/**
- * <code>TemplateHandle</code> associates one or more
- * <code>TransitionWatcher</code>s with a template.
- * Unless otherwise noted all methods are thread safe.
- */
-class TemplateHandle extends BaseHandle {
-
- /**
- * The watchers. We use a <code>HashSet</code> because we will
- * probably do a fair number of removals for each traversal and
- * the number of watchers managed by one <code>TemplateHandle</code>
- * will probably never get very large. If this does become an
- * issue making <code>TransitionWatcher</code> extend
- * <code>FastList.Node</code> and using a <code>FastList</code>
- * here would probably be a good choice (though that would require
- * changing <code>FastList</code> to support overlapping traversals
- * of different lists from the same thread.)
- */
- final private Set<TransitionWatcher> watchers
- = Collections.newSetFromMap(
- new ConcurrentHashMap<TransitionWatcher,Boolean>());
- /**
- * WriteLock guarantees that no updates can be performed during a
- * removal operation.
- */
- final private WriteLock wl;
- final private ReadLock rl;
- private boolean removed; // mutate with wl, read with rl
-
- /**
- * The <code>WatchersForTemplateClass</code> this object
- * belongs to.
- */
- final private OutriggerServerImpl owner;
-
- /**
- * Create a handle for the template <code>tmpl</code>.
- */
- TemplateHandle(EntryRep tmpl, OutriggerServerImpl owner, Queue<TemplateHandle> content) {
- super(tmpl, content);
- this.owner = owner;
- ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- wl = rwl.writeLock();
- rl = rwl.readLock();
- }
-
- /**
- * Return the description for the given field count.
- */
- EntryHandleTmplDesc descFor(int index) {
- return EntryHandle.descFor(rep(), index);
- }
-
- /**
- * Return <code>true</code> if this template matches the given entry.
- */
- boolean matches(EntryRep entry) {
- return rep().matches(entry);
- }
-
- /**
- * Add a watcher to this handle. Assumes that the handle has not
- * been removed.
- * @param watcher the watcher to be added.
- * @return true if watcher is added, false otherwise.
- * @throws NullPointerException if watcher is <code>null</code>.
- */
- boolean addTransitionWatcher(TransitionWatcher watcher) {
- if (watcher == null)
- throw new NullPointerException("Watcher can not be null");
- rl.lock();
- try {
- if (removed) return false;
- if (watcher.addTemplateHandle(this)) {
- return watchers.add(watcher);
- }
- return false;
- } finally {
- rl.unlock();
- }
- }
-
- /**
- * Remote a watcher from this handle. Does nothing
- * if the specified watcher is not associated with
- * this <code>TemplateHandle</code>.
- * @param watcher the watcher to be removed.
- * @throws NullPointerException if watcher is <code>null</code>.
- */
- void removeTransitionWatcher(TransitionWatcher watcher) {
- if (watcher == null)
- throw new NullPointerException("Watcher can not be null");
- rl.lock();
- try {
- watchers.remove(watcher);
- } finally {
- rl.unlock();
- }
- }
-
- /**
- * Iterate over the watchers associated with
- * this handle calling <code>isInterested</code> on each
- * and if it returns <code>true</code> adding the watcher to the
- * passed set.
- *
- * @param set The set to accumulate interested watchers
- * into.
- * @param transition The transition being processed.
- * @param ordinal The ordinal associated with <code>transition</code>.
- * @throws NullPointerException if either argument is <code>null</code>.
- */
- void collectInterested(Set<TransitionWatcher> set, EntryTransition transition,
- long ordinal)
- {
- rl.lock();
- try {
- if (removed) return;
- final Iterator i = watchers.iterator();
- while (i.hasNext()) {
- final TransitionWatcher w = (TransitionWatcher)i.next();
- if (w.isInterested(transition, ordinal)) {
- set.add(w);
- }
- }
- } finally {
- rl.unlock();
- }
- }
-
- /**
- * Return the <code>OutriggerServerImpl</code> this
- * handle is part of.
- * @return The <code>OutriggerServerImpl</code> this
- * handle is part of.
- */
- OutriggerServerImpl getServer() {
- return owner;
- }
-
- /**
- * Visit each <code>TransitionWatcher</code> and check to see if
- * it has expired, removing it if it has.
- * @param now an estimate of the current time expressed as
- * milliseconds since the beginning of the epoch.
- */
- void reap(long now) {
- rl.lock();
- try{
- Iterator<TransitionWatcher> it = watchers.iterator();
- while (it.hasNext()){
- it.next().removeIfExpired(now);
- }
- } finally {
- rl.unlock();
- }
- }
-
-
- /**
- * Need to lock on the wl so no one will
- * add a watcher between the check for empty and
- * when it gets removed.
- */
- boolean removeIfEmpty(){
- wl.lock();
- try {
- if (watchers.isEmpty()) {
- return remove();
- }
- return false;
- } finally {
- wl.unlock();
- }
- }
-
- @Override
- public boolean removed() {
- rl.lock();
- try {
- return removed;
- } finally {
- rl.unlock();
- }
- }
-
- @Override
- public boolean remove() {
- wl.lock();
- try {
- if (removed){
- return false; // already removed.
- } else {
- removed = super.remove();
- return removed;
- }
- } finally {
- wl.unlock();
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+/**
+ * <code>TemplateHandle</code> associates one or more
+ * <code>TransitionWatcher</code>s with a template.
+ * Unless otherwise noted all methods are thread safe.
+ */
+class TemplateHandle extends BaseHandle {
+
+ /**
+ * The watchers. We use a <code>HashSet</code> because we will
+ * probably do a fair number of removals for each traversal and
+ * the number of watchers managed by one <code>TemplateHandle</code>
+ * will probably never get very large. If this does become an
+ * issue making <code>TransitionWatcher</code> extend
+ * <code>FastList.Node</code> and using a <code>FastList</code>
+ * here would probably be a good choice (though that would require
+ * changing <code>FastList</code> to support overlapping traversals
+ * of different lists from the same thread.)
+ */
+ final private Set<TransitionWatcher> watchers
+ = Collections.newSetFromMap(
+ new ConcurrentHashMap<TransitionWatcher,Boolean>());
+ /**
+ * WriteLock guarantees that no updates can be performed during a
+ * removal operation.
+ */
+ final private WriteLock wl;
+ final private ReadLock rl;
+ private boolean removed; // mutate with wl, read with rl
+
+ /**
+ * The <code>WatchersForTemplateClass</code> this object
+ * belongs to.
+ */
+ final private OutriggerServerImpl owner;
+
+ /**
+ * Create a handle for the template <code>tmpl</code>.
+ */
+ TemplateHandle(EntryRep tmpl, OutriggerServerImpl owner, Queue<TemplateHandle> content) {
+ super(tmpl, content);
+ this.owner = owner;
+ ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
+ wl = rwl.writeLock();
+ rl = rwl.readLock();
+ }
+
+ /**
+ * Return the description for the given field count.
+ */
+ EntryHandleTmplDesc descFor(int index) {
+ return EntryHandle.descFor(rep(), index);
+ }
+
+ /**
+ * Return <code>true</code> if this template matches the given entry.
+ */
+ boolean matches(EntryRep entry) {
+ return rep().matches(entry);
+ }
+
+ /**
+ * Add a watcher to this handle. Assumes that the handle has not
+ * been removed.
+ * @param watcher the watcher to be added.
+ * @return true if watcher is added, false otherwise.
+ * @throws NullPointerException if watcher is <code>null</code>.
+ */
+ boolean addTransitionWatcher(TransitionWatcher watcher) {
+ if (watcher == null)
+ throw new NullPointerException("Watcher can not be null");
+ rl.lock();
+ try {
+ if (removed) return false;
+ if (watcher.addTemplateHandle(this)) {
+ return watchers.add(watcher);
+ }
+ return false;
+ } finally {
+ rl.unlock();
+ }
+ }
+
+ /**
+ * Remote a watcher from this handle. Does nothing
+ * if the specified watcher is not associated with
+ * this <code>TemplateHandle</code>.
+ * @param watcher the watcher to be removed.
+ * @throws NullPointerException if watcher is <code>null</code>.
+ */
+ void removeTransitionWatcher(TransitionWatcher watcher) {
+ if (watcher == null)
+ throw new NullPointerException("Watcher can not be null");
+ rl.lock();
+ try {
+ watchers.remove(watcher);
+ } finally {
+ rl.unlock();
+ }
+ }
+
+ /**
+ * Iterate over the watchers associated with
+ * this handle calling <code>isInterested</code> on each
+ * and if it returns <code>true</code> adding the watcher to the
+ * passed set.
+ *
+ * @param set The set to accumulate interested watchers
+ * into.
+ * @param transition The transition being processed.
+ * @param ordinal The ordinal associated with <code>transition</code>.
+ * @throws NullPointerException if either argument is <code>null</code>.
+ */
+ void collectInterested(Set<TransitionWatcher> set, EntryTransition transition,
+ long ordinal)
+ {
+ rl.lock();
+ try {
+ if (removed) return;
+ final Iterator i = watchers.iterator();
+ while (i.hasNext()) {
+ final TransitionWatcher w = (TransitionWatcher)i.next();
+ if (w.isInterested(transition, ordinal)) {
+ set.add(w);
+ }
+ }
+ } finally {
+ rl.unlock();
+ }
+ }
+
+ /**
+ * Return the <code>OutriggerServerImpl</code> this
+ * handle is part of.
+ * @return The <code>OutriggerServerImpl</code> this
+ * handle is part of.
+ */
+ OutriggerServerImpl getServer() {
+ return owner;
+ }
+
+ /**
+ * Visit each <code>TransitionWatcher</code> and check to see if
+ * it has expired, removing it if it has.
+ * @param now an estimate of the current time expressed as
+ * milliseconds since the beginning of the epoch.
+ */
+ void reap(long now) {
+ rl.lock();
+ try{
+ Iterator<TransitionWatcher> it = watchers.iterator();
+ while (it.hasNext()){
+ it.next().removeIfExpired(now);
+ }
+ } finally {
+ rl.unlock();
+ }
+ }
+
+
+ /**
+ * Need to lock on the wl so no one will
+ * add a watcher between the check for empty and
+ * when it gets removed.
+ */
+ boolean removeIfEmpty(){
+ wl.lock();
+ try {
+ if (watchers.isEmpty()) {
+ return remove();
+ }
+ return false;
+ } finally {
+ wl.unlock();
+ }
+ }
+
+ @Override
+ public boolean removed() {
+ rl.lock();
+ try {
+ return removed;
+ } finally {
+ rl.unlock();
+ }
+ }
+
+ @Override
+ public boolean remove() {
+ wl.lock();
+ try {
+ if (removed){
+ return false; // already removed.
+ } else {
+ removed = super.remove();
+ return removed;
+ }
+ } finally {
+ wl.unlock();
+ }
+ }
+}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java Sun Jul 5 11:41:39 2020
@@ -1,323 +1,326 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.WeakHashMap;
-import net.jini.core.transaction.TransactionException;
-import net.jini.id.Uuid;
-import net.jini.space.InternalSpaceException;
-
-/**
- * Subclass of <code>QueryWatcher</code> for and transactional
- * <code>readIfExists</code> queries. Resolves with the first
- * matching transition where the entry is visible to the associated
- * transaction and the entry is still available, or of the locked
- * entry set goes empty.
- */
-class TransactableReadIfExistsWatcher extends SingletonQueryWatcher
- implements IfExistsWatcher, Transactable
-{
- /**
- * The set of entries that would match but are currently
- * unavailable (e.g. they are locked). We only keep
- * the ids, not the entries themselves.
- */
- private final Set<Uuid> lockedEntries;
-
- /**
- * Set <code>true</code> once the query thread is
- * done processing the backlog. Once this is
- * <code>true</code> it is ok to resolve if
- * <code>lockedEntries</code> is empty.
- */
- private boolean backlogFinished = false;
-
- /**
- * The transaction this query is
- * being performed under.
- */
- private final Txn txn;
-
- /**
- * Set of entries (represented by <code>EntryHolder</code>s) that
- * we would have liked to return, but have been provisionally
- * removed.
- */
- private final Set<EntryHandle> provisionallyRemovedEntrySet;
-
- /**
- * Create a new <code>TransactableReadIfExistsWatcher</code>.
- * @param expiration the initial expiration time
- * for this <code>TransitionWatcher</code> in
- * milliseconds since the beginning of the epoch.
- * @param timestamp the value that is used
- * to sort <code>TransitionWatcher</code>s.
- * @param startOrdinal the highest ordinal associated
- * with operations that are considered to have occurred
- * before the operation associated with this watcher.
- * @param lockedEntries Set of entries (by their IDs)
- * that match but are unavailable. Must be non-empty.
- * Keeps a reference to this object.
- * @param provisionallyRemovedEntrySet If the watcher encounters
- * an entry that can not be read/taken because it has been
- * provisionally removed then its handle will be placed in
- * this <code>WeakHashMap</code> as a key (with null as the
- * value). May be <code>null</code> in which case
- * provisionally removed entries will not be
- * recorded. Ensures that object is only accessed by one
- * thread at a time
- * @param txn If the query is being performed under
- * a transaction the <code>Txn</code> object
- * associated with that transaction.
- * @throws NullPointerException if <code>lockedEntries</code> or
- * <code>txn</code> is <code>null</code>.
- */
- TransactableReadIfExistsWatcher(long expiration, long timestamp,
- long startOrdinal, Set<Uuid> lockedEntries,
- Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
- {
- super(expiration, timestamp, startOrdinal);
-
- if (lockedEntries == null)
- throw new NullPointerException("lockedEntries must be non-null");
-
- if (txn == null)
- throw new NullPointerException("txn must be non-null");
-
- this.lockedEntries = lockedEntries;
- this.txn = txn;
- this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet;
- }
-
- boolean isInterested(EntryTransition transition, long ordinal) {
- /* If we are unresolved pretty much all transitions are
- * interesting because we may need to update
- * lockedEntries. The only exception is read locks being
- * resolved. It is important that transitions triggered by the
- * release of read locks get filtered out - otherwise process
- * could end up adding and removing elements to lockedEntries
- * when it shouldn't.
- *
- * Note, !isResolved() without the lock will result only in
- * false positives, not false negatives - it will only
- * cause isInterested() to return false if we are resolved,
- * we may still return true if we are resolved though.
- */
- if (!transition.isVisible() && transition.isAvailable()) {
- /* must be a transition triggered by a read lock release,
- * wont change anything so ignore it.
- */
- return false;
- }
-
- return (ordinal>startOrdinal) && !isResolved();
- }
-
- synchronized void process(EntryTransition transition, long now) {
- if (isResolved())
- return; // Already done.
-
- final EntryHandle handle = transition.getHandle();
- final EntryRep rep = handle.rep();
- final boolean isVisible = transition.isVisible();
- final TransactableMgr transitionTxn = transition.getTxn();
-
- /* If it at one time it was available to our transaction
- * it may still be, try to get it.
- */
- if (isVisible &&
- ((null == transitionTxn) || txn == transitionTxn)) {
- /* Is it still available? */
- if (getServer().attemptCapture(handle, txn, false, null,
- provisionallyRemovedEntrySet, now, this))
- {
- // Got it
- resolve(handle, null);
- } else {
- /* Must not have been able to get it. Either
- * locked under a conflicting lock, in which
- * case it should be in our lockedEntries set, or
- * it has been removed, in which it still needs
- * to be in our lockedEntries since it may have
- * been replaced before being removed.
- */
- lockedEntries.add(rep.id());
- }
- } else if (isVisible) { // but it is not visible to txn
- /* If we are here then at one time it must have been was
- * visible but not visible to us, implying that it was an
- * entry written under a transaction and is interesting to
- * us, but not yet visible. We need to add it lockedEntries
- * even if has been removed since it could have gotten
- * replaced before it was removed. If we did not
- * add it we would be acting on the future.
- */
- lockedEntries.add(rep.id());
- } else {
- /* Must not be available, transition must mark
- * the resolution of the transaction in such away
- * that the entry has been removed, remove it
- * from the set and see if that makes the set empty.
- */
- lockedEntries.remove(rep.id());
- if (backlogFinished && lockedEntries.isEmpty())
- resolve(null, null);
- }
- }
-
- synchronized boolean catchUp(EntryTransition transition, long now) {
- if (isResolved())
- return true; // Already done.
-
- final EntryHandle handle = transition.getHandle();
- final EntryRep rep = handle.rep();
- final boolean isVisible = transition.isVisible();
- final TransactableMgr transitionTxn = transition.getTxn();
-
- /* Was this the resolution of a read lock? if so ignore */
- if (!isVisible && transition.isAvailable())
- return false;
-
- /* If it at one time it was available to our transaction
- * it may still be, try to get it.
- */
- if (isVisible &&
- ((null == transitionTxn) || txn == transitionTxn)) {
- /* Is it still visible? Try to get it. attemptCapture will
- * add the entry to lockedEntries for us if we could not
- * get it and it is still in the space (but locked).
- * Nothing will be added if it has been removed outright.
- * This is ok even though we are peaking into the future -
- * we won't act on that information until the future
- * comes to pass.
- */
- if (getServer().attemptCapture(handle, txn, false,
- lockedEntries, provisionallyRemovedEntrySet, now, this))
- {
- // Got it
- resolve(handle, null);
- return true;
- }
-
- // did not resolve
- return false;
- }
-
- if (isVisible) { // but it is not visible to txn
- /* If we are here then at one time it must have been was
- * visible but not visible to us, implying that it was
- * an entry written under a transaction and is interesting
- * to us, but not yet visible. We only add if it has not
- * already been removed. It might have gotten replaced
- * before removal, but since we won't let this query get
- * resolved with a definitive null before we get past the
- * point in the journal where it was removed it is ok to
- * never put it in (and if we did put it in it might never
- * get removed since process() may have already processed
- * the removal). We don't need to check to see it the
- * entry has been provisionally removed since if has been
- * provisional removal does not put in entries in the
- * journal and if it is provisionally removed it has not
- * yet been removed so the remove recored has not yet been
- * created (must less processed).
- */
- synchronized (handle) {
- if (!handle.removed()) {
- lockedEntries.add(rep.id());
- }
-
- /* If it has been removed, there is no way it
- * will be interesting to us again, ever.
- */
- }
- // Either way, still not resolved.
- return false;
- }
-
- /* Must not be visible (and because of the first test can't be
- * available either - that is transition can't just be the
- * release of a read lock), transition must mark the
- * resolution of the transaction in such away that the entry
- * has been removed, remove it from the set (don't need to
- * check for empty because we haven't gotten to the point
- * where we can resolve with a definitive null.)
- */
- lockedEntries.remove(rep.id());
- return false;
- }
-
- /**
- * Once the backlog is complete we can resolve if
- * lockedEntries is/becomes empty.
- */
- public synchronized void caughtUp() {
- backlogFinished = true;
-
- if (isResolved())
- return; // Don't much mater.
-
- if (lockedEntries.isEmpty())
- resolve(null, null);
- }
-
- public synchronized boolean isLockedEntrySetEmpty() {
- if (!isResolved())
- throw new IllegalStateException("Query not yet resolved");
- return lockedEntries.isEmpty();
- }
-
- /**
- * If a transaction ends in the middle of a query we want
- * to throw an exception to the client making the query
- * not the <code>Txn</code> calling us here.)
- */
- public synchronized int prepare(TransactableMgr mgr,
- OutriggerServerImpl space)
- {
- // only throw an exception if we are not resolved.
- if (!isResolved()) {
- // Query still in progress, kill it
- resolve(null, new TransactionException("completed while " +
- "operation in progress"));
- }
-
- // If this object has made changes they have been recorded elsewhere
- return NOTCHANGED;
- }
-
- /**
- * This should never happen since we always return
- * <code>NOTCHANGED</code> from <code>prepare</code>.
- */
- public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
- throw new InternalSpaceException("committing a blocking query");
-
- }
-
- /**
- * If a transaction ends in the middle of a query we want
- * to throw an exception to the client making the query
- * (not the <code>Txn</code> calling us here.)
- */
- public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
- // prepare does the right thing, and should forever
- prepare(mgr, space);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import net.jini.core.transaction.TransactionException;
+import net.jini.id.Uuid;
+import net.jini.space.InternalSpaceException;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+
+/**
+ * Subclass of <code>QueryWatcher</code> for and transactional
+ * <code>readIfExists</code> queries. Resolves with the first
+ * matching transition where the entry is visible to the associated
+ * transaction and the entry is still available, or of the locked
+ * entry set goes empty.
+ */
+class TransactableReadIfExistsWatcher extends SingletonQueryWatcher
+ implements IfExistsWatcher, Transactable
+{
+ /**
+ * The set of entries that would match but are currently
+ * unavailable (e.g. they are locked). We only keep
+ * the ids, not the entries themselves.
+ */
+ private final Set<Uuid> lockedEntries;
+
+ /**
+ * Set <code>true</code> once the query thread is
+ * done processing the backlog. Once this is
+ * <code>true</code> it is ok to resolve if
+ * <code>lockedEntries</code> is empty.
+ */
+ private boolean backlogFinished = false;
+
+ /**
+ * The transaction this query is
+ * being performed under.
+ */
+ private final Txn txn;
+
+ /**
+ * Set of entries (represented by <code>EntryHolder</code>s) that
+ * we would have liked to return, but have been provisionally
+ * removed.
+ */
+ private final Set<EntryHandle> provisionallyRemovedEntrySet;
+
+ /**
+ * Create a new <code>TransactableReadIfExistsWatcher</code>.
+ * @param expiration the initial expiration time
+ * for this <code>TransitionWatcher</code> in
+ * milliseconds since the beginning of the epoch.
+ * @param timestamp the value that is used
+ * to sort <code>TransitionWatcher</code>s.
+ * @param startOrdinal the highest ordinal associated
+ * with operations that are considered to have occurred
+ * before the operation associated with this watcher.
+ * @param lockedEntries Set of entries (by their IDs)
+ * that match but are unavailable. Must be non-empty.
+ * Keeps a reference to this object.
+ * @param provisionallyRemovedEntrySet If the watcher encounters
+ * an entry that can not be read/taken because it has been
+ * provisionally removed then its handle will be placed in
+ * this <code>WeakHashMap</code> as a key (with null as the
+ * value). May be <code>null</code> in which case
+ * provisionally removed entries will not be
+ * recorded. Ensures that object is only accessed by one
+ * thread at a time
+ * @param txn If the query is being performed under
+ * a transaction the <code>Txn</code> object
+ * associated with that transaction.
+ * @throws NullPointerException if <code>lockedEntries</code> or
+ * <code>txn</code> is <code>null</code>.
+ */
+ TransactableReadIfExistsWatcher(long expiration, long timestamp,
+ long startOrdinal, Set<Uuid> lockedEntries,
+ Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn)
+ {
+ super(expiration, timestamp, startOrdinal);
+
+ if (lockedEntries == null)
+ throw new NullPointerException("lockedEntries must be non-null");
+
+ if (txn == null)
+ throw new NullPointerException("txn must be non-null");
+
+ this.lockedEntries = lockedEntries;
+ this.txn = txn;
+ this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet;
+ }
+
+ boolean isInterested(EntryTransition transition, long ordinal) {
+ /* If we are unresolved pretty much all transitions are
+ * interesting because we may need to update
+ * lockedEntries. The only exception is read locks being
+ * resolved. It is important that transitions triggered by the
+ * release of read locks get filtered out - otherwise process
+ * could end up adding and removing elements to lockedEntries
+ * when it shouldn't.
+ *
+ * Note, !isResolved() without the lock will result only in
+ * false positives, not false negatives - it will only
+ * cause isInterested() to return false if we are resolved,
+ * we may still return true if we are resolved though.
+ */
+ if (!transition.isVisible() && transition.isAvailable()) {
+ /* must be a transition triggered by a read lock release,
+ * wont change anything so ignore it.
+ */
+ return false;
+ }
+
+ return (ordinal>startOrdinal) && !isResolved();
+ }
+
+ synchronized void process(EntryTransition transition, long now) {
+ if (isResolved())
+ return; // Already done.
+
+ final EntryHandle handle = transition.getHandle();
+ final EntryRep rep = handle.rep();
+ final boolean isVisible = transition.isVisible();
+ final TransactableMgr transitionTxn = transition.getTxn();
+
+ /* If it at one time it was available to our transaction
+ * it may still be, try to get it.
+ */
+ if (isVisible &&
+ ((null == transitionTxn) || txn == transitionTxn)) {
+ /* Is it still available? */
+ if (getServer().attemptCapture(handle, txn, false, null,
+ provisionallyRemovedEntrySet, now, this))
+ {
+ // Got it
+ resolve(handle, null);
+ } else {
+ /* Must not have been able to get it. Either
+ * locked under a conflicting lock, in which
+ * case it should be in our lockedEntries set, or
+ * it has been removed, in which it still needs
+ * to be in our lockedEntries since it may have
+ * been replaced before being removed.
+ */
+ lockedEntries.add(rep.id());
+ }
+ } else if (isVisible) { // but it is not visible to txn
+ /* If we are here then at one time it must have been was
+ * visible but not visible to us, implying that it was an
+ * entry written under a transaction and is interesting to
+ * us, but not yet visible. We need to add it lockedEntries
+ * even if has been removed since it could have gotten
+ * replaced before it was removed. If we did not
+ * add it we would be acting on the future.
+ */
+ lockedEntries.add(rep.id());
+ } else {
+ /* Must not be available, transition must mark
+ * the resolution of the transaction in such away
+ * that the entry has been removed, remove it
+ * from the set and see if that makes the set empty.
+ */
+ lockedEntries.remove(rep.id());
+ if (backlogFinished && lockedEntries.isEmpty())
+ resolve(null, null);
+ }
+ }
+
+ synchronized boolean catchUp(EntryTransition transition, long now) {
+ if (isResolved())
+ return true; // Already done.
+
+ final EntryHandle handle = transition.getHandle();
+ final EntryRep rep = handle.rep();
+ final boolean isVisible = transition.isVisible();
+ final TransactableMgr transitionTxn = transition.getTxn();
+
+ /* Was this the resolution of a read lock? if so ignore */
+ if (!isVisible && transition.isAvailable())
+ return false;
+
+ /* If it at one time it was available to our transaction
+ * it may still be, try to get it.
+ */
+ if (isVisible &&
+ ((null == transitionTxn) || txn == transitionTxn)) {
+ /* Is it still visible? Try to get it. attemptCapture will
+ * add the entry to lockedEntries for us if we could not
+ * get it and it is still in the space (but locked).
+ * Nothing will be added if it has been removed outright.
+ * This is ok even though we are peaking into the future -
+ * we won't act on that information until the future
+ * comes to pass.
+ */
+ if (getServer().attemptCapture(handle, txn, false,
+ lockedEntries, provisionallyRemovedEntrySet, now, this))
+ {
+ // Got it
+ resolve(handle, null);
+ return true;
+ }
+
+ // did not resolve
+ return false;
+ }
+
+ if (isVisible) { // but it is not visible to txn
+ /* If we are here then at one time it must have been was
+ * visible but not visible to us, implying that it was
+ * an entry written under a transaction and is interesting
+ * to us, but not yet visible. We only add if it has not
+ * already been removed. It might have gotten replaced
+ * before removal, but since we won't let this query get
+ * resolved with a definitive null before we get past the
+ * point in the journal where it was removed it is ok to
+ * never put it in (and if we did put it in it might never
+ * get removed since process() may have already processed
+ * the removal). We don't need to check to see it the
+ * entry has been provisionally removed since if has been
+ * provisional removal does not put in entries in the
+ * journal and if it is provisionally removed it has not
+ * yet been removed so the remove recored has not yet been
+ * created (must less processed).
+ */
+ synchronized (handle) {
+ if (!handle.removed()) {
+ lockedEntries.add(rep.id());
+ }
+
+ /* If it has been removed, there is no way it
+ * will be interesting to us again, ever.
+ */
+ }
+ // Either way, still not resolved.
+ return false;
+ }
+
+ /* Must not be visible (and because of the first test can't be
+ * available either - that is transition can't just be the
+ * release of a read lock), transition must mark the
+ * resolution of the transaction in such away that the entry
+ * has been removed, remove it from the set (don't need to
+ * check for empty because we haven't gotten to the point
+ * where we can resolve with a definitive null.)
+ */
+ lockedEntries.remove(rep.id());
+ return false;
+ }
+
+ /**
+ * Once the backlog is complete we can resolve if
+ * lockedEntries is/becomes empty.
+ */
+ public synchronized void caughtUp() {
+ backlogFinished = true;
+
+ if (isResolved())
+ return; // Don't much mater.
+
+ if (lockedEntries.isEmpty())
+ resolve(null, null);
+ }
+
+ public synchronized boolean isLockedEntrySetEmpty() {
+ if (!isResolved())
+ throw new IllegalStateException("Query not yet resolved");
+ return lockedEntries.isEmpty();
+ }
+
+ /**
+ * If a transaction ends in the middle of a query we want
+ * to throw an exception to the client making the query
+ * not the <code>Txn</code> calling us here.)
+ */
+ public synchronized int prepare(TransactableMgr mgr,
+ OutriggerServerImpl space)
+ {
+ // only throw an exception if we are not resolved.
+ if (!isResolved()) {
+ // Query still in progress, kill it
+ resolve(null, new TransactionException("completed while " +
+ "operation in progress"));
+ }
+
+ // If this object has made changes they have been recorded elsewhere
+ return NOTCHANGED;
+ }
+
+ /**
+ * This should never happen since we always return
+ * <code>NOTCHANGED</code> from <code>prepare</code>.
+ */
+ public void commit(TransactableMgr mgr, OutriggerServerImpl space) {
+ throw new InternalSpaceException("committing a blocking query");
+
+ }
+
+ /**
+ * If a transaction ends in the middle of a query we want
+ * to throw an exception to the client making the query
+ * (not the <code>Txn</code> calling us here.)
+ */
+ public void abort(TransactableMgr mgr, OutriggerServerImpl space) {
+ // prepare does the right thing, and should forever
+ prepare(mgr, space);
+ }
+}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java Sun Jul 5 11:41:39 2020
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.io.IOException;
-import javax.security.auth.login.LoginException;
-import net.jini.config.ConfigurationException;
-import org.apache.river.start.LifeCycle;
-
-/**
- * <code>OutriggerServerWrapper</code> subclass for
- * transient servers.
- *
- * @author Sun Microsystems, Inc.
- * @since 2.0
- */
-class TransientOutriggerImpl extends OutriggerServerWrapper {
- /**
- * Create a new transient outrigger server.
- * @param configArgs set of strings to be used to obtain a
- * <code>Configuration</code>.
- * @param lifeCycle the object to notify when this
- * service is destroyed.
- * @throws IOException if there is problem exporting the server.
- * @throws ConfigurationException if the <code>Configuration</code> is
- * malformed.
- * @throws LoginException if the <code>loginContext</code> specified
- * in the configuration is non-null and throws
- * an exception when login is attempted.
- */
- TransientOutriggerImpl(String[] configArgs, LifeCycle lifeCycle)
- throws IOException, ConfigurationException, LoginException
- {
- super(configArgs, lifeCycle, false);
- allowCalls();
- }
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.io.IOException;
+import javax.security.auth.login.LoginException;
+import net.jini.config.ConfigurationException;
+import org.apache.river.start.lifecycle.LifeCycle;
+
+/**
+ * <code>OutriggerServerWrapper</code> subclass for
+ * transient servers.
+ *
+ * @author Sun Microsystems, Inc.
+ * @since 2.0
+ */
+class TransientOutriggerImpl extends OutriggerServerWrapper {
+ /**
+ * Create a new transient outrigger server.
+ * @param configArgs set of strings to be used to obtain a
+ * <code>Configuration</code>.
+ * @param lifeCycle the object to notify when this
+ * service is destroyed.
+ * @throws IOException if there is problem exporting the server.
+ * @throws ConfigurationException if the <code>Configuration</code> is
+ * malformed.
+ * @throws LoginException if the <code>loginContext</code> specified
+ * in the configuration is non-null and throws
+ * an exception when login is attempted.
+ */
+ TransientOutriggerImpl(String[] configArgs, LifeCycle lifeCycle)
+ throws IOException, ConfigurationException, LoginException
+ {
+ super(configArgs, lifeCycle, false);
+ allowCalls();
+ }
+}
+
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java Sun Jul 5 11:41:39 2020
@@ -1,183 +1,185 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import java.util.Map;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Given an <code>EntryHandle</code> who's entry is making a
- * visibility transition this class will find all the
- * <code>TransitionWatcher</code>s who are interested in that
- * transition. The <code>TransitionWatcher</code>s are organized
- * into groups using <code>TemplateHandle</code>. Each
- * <code>TemplateHandle</code> aggregates a number of watchers
- * all interested in the same template.
- *
- * @see TransitionWatcher
- * @author Sun Microsystems, Inc.
- */
-class TransitionWatchers {
- /**
- * A map from class names to <code>WatchersForTemplateClass</code>
- * objects
- */
- final private ConcurrentMap<String,WatchersForTemplateClass> holders
- = new ConcurrentHashMap<String,WatchersForTemplateClass>();
-
- /** The server we are working for */
- final private OutriggerServerImpl server;
-
- /**
- * Create a new <code>TransitionWatchers</code> object
- * for the specified server.
- * @param server The server the new <code>TransitionWatchers</code>
- * object is working for.
- * @throws NullPointerException if <code>server</code> is
- * <code>null</code>
- */
- TransitionWatchers(OutriggerServerImpl server) {
- this(check(server), server);
- }
-
- private static boolean check(OutriggerServerImpl server) throws NullPointerException {
- if (server == null)
- throw new NullPointerException("server must be non-null");
- return true;
- }
-
- private TransitionWatchers(boolean checked, OutriggerServerImpl server){
- this.server = server;
- }
- /**
- * Add a <code>TransitionWatcher</code> to the list
- * of watchers looking for visibility transitions in
- * entries that match the specified template. Associates
- * a <code>TemplateHandle</code> using
- * <code>TransitionWatcher.setTemplateHandle</code> method.
- * <p>
- * This method is thread safe. The watcher added in this call is
- * guaranteed to be consulted by the next call to
- * <code>allMatches</code> that starts after this call completes even
- * if that call is made from another thread. Also, all of
- * of the assigned values in the calling thread's working
- * memory will be copied out to main memory as part of the
- * process of making the passed watcher visible to future
- * <code>allMatches</code> and <code>findTransitionWatcher</code> calls.
- *
- * @param watcher The <code>TransitionWatcher</code> being added.
- * @param template The <code>EntryRep</code> that represents
- * the template of interest.
- * @throws NullPointerException if either argument is
- * <code>null</code>.
- */
- void add(TransitionWatcher watcher, EntryRep template) {
- // Get/create the appropriate WatchersForTemplateClass
- final String className = template.classFor();
- WatchersForTemplateClass holder = holders.get(className);
- if (holder == null) {
- holder = new WatchersForTemplateClass(server);
- WatchersForTemplateClass existed = holders.putIfAbsent(className, holder);
- if (existed != null) holder = existed;
- }
- // Add the watcher to the WatchersForTemplateClass
- holder.add(watcher, template);
- }
-
- /**
- * Return a <code>SortedSet</code> of all the
- * <code>TransitionWatcher</code> who's <code>isInterested</code>
- * methods return <code>true</code> when asked about the specified
- * visibility transition.
- * <p>
- * This method is thread safe. This call is guaranteed to check unremoved
- * watchers that were added by <code>add</code> calls that completed
- * before this call started, even if the calls were made from
- * different threads. Before the <code>isInterested</code> method
- * of the first watcher is called the working memory of this thread
- * will be flushed so any changes made to main memory before
- * this call started will be visible.
- *
- * @param transition A <code>EntryTransition</code> that
- * describes the transition and what
- * entry is transitioning. This method
- * will assume that <code>transition.getHandle</code>
- * returns a non-null value.
- * @param ordinal The ordinal associated with <code>transition</code>.
- * @return A new <code>SortedSet</code> of all the
- * <code>TransitionWatcher</code>s interested in the specified
- * visibility transition. If none are interested an empty
- * map will be returned.
- * @throws NullPointerException if <code>transition</code> is
- * <code>null</code>.
- */
- SortedSet<TransitionWatcher> allMatches(EntryTransition transition, long ordinal) {
- final EntryRep rep = transition.getHandle().rep();
- final SortedSet<TransitionWatcher> rslt = new java.util.TreeSet<TransitionWatcher>();
- final String className = rep.classFor();
- WatchersForTemplateClass holder;
-
- /* Collect all the watchers looking for the exact class of the
- * transitioned entry.
- */
- holder = holders.get(className);
-
- if (holder != null) holder.collectInterested(rslt, transition, ordinal);
-
- // Get all the templates that are super classes of className
- final String[] superclasses = rep.superclasses();
- for (int i=0; i<superclasses.length; i++) {
- holder = holders.get(superclasses[i]);
- if (holder != null)
- holder.collectInterested(rslt, transition, ordinal);
- }
-
- // Including those registered for the null template
- final String nullClass = EntryRep.matchAnyEntryRep().classFor();
- holder = holders.get(nullClass);
- if (holder!=null) holder.collectInterested(rslt, transition, ordinal);
- return rslt;
- }
-
- /**
- * Visit each <code>TransitionWatcher</code> and check to see if
- * it has expired, removing it if it has.
- */
- void reap() {
- final long now = System.currentTimeMillis();
- Iterator<WatchersForTemplateClass> watchers = holders.values().iterator();
- while (watchers.hasNext()){
- watchers.next().reap(now);
- }
- }
-
- /**
- * Return the <code>OutriggerServerImpl</code> this
- * <code>TransitionWatchers</code> object is part of.
- * @return The <code>OutriggerServerImpl</code> this
- * <code>TransitionWatchers</code> is part of.
- */
- OutriggerServerImpl getServer() {
- return server;
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import java.util.Map;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
+/**
+ * Given an <code>EntryHandle</code> who's entry is making a
+ * visibility transition this class will find all the
+ * <code>TransitionWatcher</code>s who are interested in that
+ * transition. The <code>TransitionWatcher</code>s are organized
+ * into groups using <code>TemplateHandle</code>. Each
+ * <code>TemplateHandle</code> aggregates a number of watchers
+ * all interested in the same template.
+ *
+ * @see TransitionWatcher
+ * @author Sun Microsystems, Inc.
+ */
+class TransitionWatchers {
+ /**
+ * A map from class names to <code>WatchersForTemplateClass</code>
+ * objects
+ */
+ final private ConcurrentMap<String,WatchersForTemplateClass> holders
+ = new ConcurrentHashMap<String,WatchersForTemplateClass>();
+
+ /** The server we are working for */
+ final private OutriggerServerImpl server;
+
+ /**
+ * Create a new <code>TransitionWatchers</code> object
+ * for the specified server.
+ * @param server The server the new <code>TransitionWatchers</code>
+ * object is working for.
+ * @throws NullPointerException if <code>server</code> is
+ * <code>null</code>
+ */
+ TransitionWatchers(OutriggerServerImpl server) {
+ this(check(server), server);
+ }
+
+ private static boolean check(OutriggerServerImpl server) throws NullPointerException {
+ if (server == null)
+ throw new NullPointerException("server must be non-null");
+ return true;
+ }
+
+ private TransitionWatchers(boolean checked, OutriggerServerImpl server){
+ this.server = server;
+ }
+ /**
+ * Add a <code>TransitionWatcher</code> to the list
+ * of watchers looking for visibility transitions in
+ * entries that match the specified template. Associates
+ * a <code>TemplateHandle</code> using
+ * <code>TransitionWatcher.setTemplateHandle</code> method.
+ * <p>
+ * This method is thread safe. The watcher added in this call is
+ * guaranteed to be consulted by the next call to
+ * <code>allMatches</code> that starts after this call completes even
+ * if that call is made from another thread. Also, all of
+ * of the assigned values in the calling thread's working
+ * memory will be copied out to main memory as part of the
+ * process of making the passed watcher visible to future
+ * <code>allMatches</code> and <code>findTransitionWatcher</code> calls.
+ *
+ * @param watcher The <code>TransitionWatcher</code> being added.
+ * @param template The <code>EntryRep</code> that represents
+ * the template of interest.
+ * @throws NullPointerException if either argument is
+ * <code>null</code>.
+ */
+ void add(TransitionWatcher watcher, EntryRep template) {
+ // Get/create the appropriate WatchersForTemplateClass
+ final String className = template.classFor();
+ WatchersForTemplateClass holder = holders.get(className);
+ if (holder == null) {
+ holder = new WatchersForTemplateClass(server);
+ WatchersForTemplateClass existed = holders.putIfAbsent(className, holder);
+ if (existed != null) holder = existed;
+ }
+ // Add the watcher to the WatchersForTemplateClass
+ holder.add(watcher, template);
+ }
+
+ /**
+ * Return a <code>SortedSet</code> of all the
+ * <code>TransitionWatcher</code> who's <code>isInterested</code>
+ * methods return <code>true</code> when asked about the specified
+ * visibility transition.
+ * <p>
+ * This method is thread safe. This call is guaranteed to check unremoved
+ * watchers that were added by <code>add</code> calls that completed
+ * before this call started, even if the calls were made from
+ * different threads. Before the <code>isInterested</code> method
+ * of the first watcher is called the working memory of this thread
+ * will be flushed so any changes made to main memory before
+ * this call started will be visible.
+ *
+ * @param transition A <code>EntryTransition</code> that
+ * describes the transition and what
+ * entry is transitioning. This method
+ * will assume that <code>transition.getHandle</code>
+ * returns a non-null value.
+ * @param ordinal The ordinal associated with <code>transition</code>.
+ * @return A new <code>SortedSet</code> of all the
+ * <code>TransitionWatcher</code>s interested in the specified
+ * visibility transition. If none are interested an empty
+ * map will be returned.
+ * @throws NullPointerException if <code>transition</code> is
+ * <code>null</code>.
+ */
+ SortedSet<TransitionWatcher> allMatches(EntryTransition transition, long ordinal) {
+ final EntryRep rep = transition.getHandle().rep();
+ final SortedSet<TransitionWatcher> rslt = new java.util.TreeSet<TransitionWatcher>();
+ final String className = rep.classFor();
+ WatchersForTemplateClass holder;
+
+ /* Collect all the watchers looking for the exact class of the
+ * transitioned entry.
+ */
+ holder = holders.get(className);
+
+ if (holder != null) holder.collectInterested(rslt, transition, ordinal);
+
+ // Get all the templates that are super classes of className
+ final String[] superclasses = rep.superclasses();
+ for (int i=0; i<superclasses.length; i++) {
+ holder = holders.get(superclasses[i]);
+ if (holder != null)
+ holder.collectInterested(rslt, transition, ordinal);
+ }
+
+ // Including those registered for the null template
+ final String nullClass = EntryRep.matchAnyEntryRep().classFor();
+ holder = holders.get(nullClass);
+ if (holder!=null) holder.collectInterested(rslt, transition, ordinal);
+ return rslt;
+ }
+
+ /**
+ * Visit each <code>TransitionWatcher</code> and check to see if
+ * it has expired, removing it if it has.
+ */
+ void reap() {
+ final long now = System.currentTimeMillis();
+ Iterator<WatchersForTemplateClass> watchers = holders.values().iterator();
+ while (watchers.hasNext()){
+ watchers.next().reap(now);
+ }
+ }
+
+ /**
+ * Return the <code>OutriggerServerImpl</code> this
+ * <code>TransitionWatchers</code> object is part of.
+ * @return The <code>OutriggerServerImpl</code> this
+ * <code>TransitionWatchers</code> is part of.
+ */
+ OutriggerServerImpl getServer() {
+ return server;
+ }
+
+}