You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2020/07/05 11:41:42 UTC

svn commit: r1879521 [31/37] - in /river/jtsk/modules/modularize/apache-river: ./ browser/ browser/src/main/java/org/apache/river/example/browser/ extra/ groovy-config/ river-activation/ river-collections/ river-collections/src/main/java/org/apache/riv...

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java Sun Jul  5 11:41:39 2020
@@ -1,537 +1,540 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import org.apache.river.constants.TxnConstants;
-import org.apache.river.logging.Levels;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.rmi.RemoteException;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Collections;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.transaction.CannotJoinException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionConstants;
-import net.jini.core.transaction.server.TransactionManager;
-
-import net.jini.space.InternalSpaceException;
-import net.jini.security.ProxyPreparer;
-
-/**
- * This class represents a space's state in a single transaction.
- *
- * Object of this class represent Jini transactions within outrigger.
- * These transactions hold "Transactables" -- things that represent
- * the actions that have been taken under this transaction. For example, 
- * if this transaction were to be cancelled, the Transactables are
- * examined and serve as the list of things to roll back in order to 
- * restore the state of the Space status quo ante.
- *
- * This is achieved by having the transactables notified of state changes 
- * to this transaction such as preparing, commit, etc. The 
- * transactables themselves are responsible for "doing the right thing."
- *
- * NB: some--but not all--of the actions one can take with a transaction
- * are managed internally by these objects. That is, all of the important
- * methods objects of these types are synchronized. Therefore, two
- * simultaneous calls to abort() and commit() are arbitrated properly. 
- *
- * However, care must be taken when add()ing a transactable. Even
- * though the add() method is synchronized if you check the state of
- * the transaction to ensure that is active and then call add() the
- * transaction could have left the active state between the check and
- * the add() call unless the call obtains the appropriate locks. This 
- * is even more likely if other work needs to be done in addition to
- * calling add() (e.g. persisting state, obtaining locks, etc.). The
- * caller of add() should lock the associated transaction object and
- * ensure that the transaction is still considered ACTIVE, do whatever
- * work is necessary to complete while the transaction is in the ACTIVE
- * state (including calling call add()) and then release the lock.
- * This can be done by :
- * <ul>
- * <li> holding the lock on this object while checking the
- *      state and carrying out the operation (including calling add()), or
- * <li> calling ensureActive() to check the state
- *      and obtain a non-exclusive lock, carrying out the operation
- *      (including calling add()), and then calling allowStateChange() to
- *      release the lock.
- * </ul>
- * The pair of ensureActive() and allowStateChange() allows for more
- * concurrency if the operation is expected to take a long time, in
- * that it will allow for other operations to be performed under the
- * same transaction and let aborts prevent other operations from
- * being started.
- *
- * @author Sun Microsystems, Inc.  
- */
-class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> {
-
-    /** The internal id Outrigger as assigned to the transaction */
-    final private long id;
-
-    /** What state we think the transaction is in */
-    private volatile int state;
-
-    /** 
-     * The transaction manager associated with the transaction 
-     * this object is fronting for.
-     */
-    private StorableReference trm;
-
-    /**
-     * Cached <code>ServerTransaction</code> object for
-     * the transaction this object is fronting for.
-     */
-    private ServerTransaction tr;
-    
-    /** The id the transaction manager assigned to this transaction */
-    private volatile long  trId;
-
-    /** 
-     * The list of <code>Transactable</code> participating in 
-     * this transaction.
-     */
-    final private List<Transactable> txnables = new java.util.LinkedList<Transactable>();
-
-    /**
-     * The task responsible for monitoring to see if this
-     * transaction has been aborted with us being told, or
-     * null if no such task as been allocated.
-     */
-    private volatile TxnMonitorTask	monitorTask;
-
-    /** Count of number of threads holding a read lock on state */
-    private int stateReaders = 0;
-
-    /** 
-     * <code>true</code> if there is a blocked state change. Used
-     * to give writers priority.
-     */
-    private boolean stateChangeWaiting = false;
-
-    /** Logger for logging transaction related information */
-    private static final Logger logger = 
-	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
-    /**
-     * Create a new <code>Txn</code> that represents our state in the
-     * given <code>ServerTransaction</code>.
-     */
-    Txn(ServerTransaction tr, long id) {
-	this(id);
-	trId = tr.id;
-	this.tr = tr;
-	this.trm = new StorableReference(tr.mgr);
-	state = ACTIVE;
-
-	if (logger.isLoggable(Level.FINER)) {
-	    logger.log(Level.FINER, "creating txn for transaction mgr:" +
-		    "{0}, id:{1}, state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	}
-    }
-
-    /** Used in recovery */
-    Txn(long id) {
-	this.id = id;		// the txn id is not persisted
-    }
-
-    /**
-     * Get the id for this txn. Note that this id is NOT the same as
-     * the ID of the transaction. Since that ID is not unique (it must
-     * be qualified with the <code>ServerTransaction</code> object) we create
-     * our own internal id to make txns unique. This is needed since we
-     * may not have the <code>Transaction</code> unmarshalled.
-     */
-    Long getId() {
-	return Long.valueOf(id);
-    }
-
-    /**
-     * We keep the transaction ID around because we may need it
-     * to identify a broken transaction after recovery.
-     */
-    long getTransactionId() {
-	return trId;
-    }
-
-    /**
-     * Return our local view of the current state. Need to be holding
-     * the lock on this object or have called <code>ensureActive</code>
-     * to get the current value.
-     */
-    int getState() {
-	return state;
-    }
-
-    /**
-     * Atomically checks that this transaction is in the active
-     * state and locks the transaction in the active state.
-     * The lock can be released by calling <code>allowStateChange</code>.
-     * Each call to this method should be paired with a call to
-     * <code>allowStateChange</code> in a finally block.
-     * @throws CannotJoinException if the transaction
-     * is not active or a state change is pending.  
-     */
-    synchronized void ensureActive() throws CannotJoinException {
-	if (state != ACTIVE || stateChangeWaiting) {
-	    final String msg = "transaction mgr:" + tr + ", id:" + trId +
-		" not active, in state " + TxnConstants.getName(state);
-	    final CannotJoinException e = new CannotJoinException(msg);
-	    logger.log(Levels.FAILED, msg, e);
-	    throw e; 
-	}
-	assert stateReaders >= 0;
-	stateReaders++;
-    }
-
-    /**
-     * Release the read lock created by an <code>ensureActive</code>
-     * call. Does nothing if the transaction is not active or there is
-     * a state change pending and thus is safe to call even if the
-     * corresponding <code>ensureActive</code> call threw
-     * <code>CannotJoinException</code>.  
-     */
-    synchronized void allowStateChange() {
-	if (state != ACTIVE || stateChangeWaiting)
-	    return;
-	stateReaders--;
-	assert stateReaders >= 0;
-	notifyAll();
-    }
-
-    /**
-     * Prevents new operations from being started under this
-     * transaction and blocks until in process operations are
-     * completed.
-     */
-    synchronized void makeInactive() {
-	stateChangeWaiting = true;
-	assert stateReaders >= 0;
-	while (stateReaders != 0) {
-	    try {
-		wait();
-	    } catch (InterruptedException e) {
-		throw new AssertionError(e);
-	    }
-	    assert stateReaders >= 0;
-	}
-    }
-
-    /**
-     * Prepare for transaction commit. <code>makeInactive</code> must have 
-     * been called on this transaction first.
-     */
-    synchronized int prepare(OutriggerServerImpl space) {
-	assert stateChangeWaiting : "prepare called before makeInactive";
-	assert stateReaders == 0 : "prepare called before makeInactive completed";
-
-	if (logger.isLoggable(Level.FINER)) {
-	    logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
-		"state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	}
-
-	switch (state) {
-	  case ABORTED:			       // previously aborted
-	    return ABORTED;
-
-	  case COMMITTED:		       // previously committed
-	    throw new IllegalStateException(); // "cannot happen"
-
-	  case NOTCHANGED:		       // previously voted NOTCHANGED
-	  case PREPARED:		       // previously voted PREPARED
-	    return state;		       // they are idempotent, and
-	                                       // and we have nothing to do
-	                                       // so return
-
-	  case ACTIVE:			       // currently active
-	    boolean changed = false;	       // did this txn change
-	                                       // anything?
-
-	    if (logger.isLoggable(Level.FINEST)) {
-		logger.log(Level.FINEST, "prepare:preparing transaction mgr:" +
-		    "{0}, id:{1}, state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	    }
-
-	    // loop through Transactable members of this Txn
-	    final Iterator<Transactable> i = txnables.iterator();
-	    int c=0; // Counter for debugging message
-	    while (i.hasNext()) {
-		// get this member's vote
-		final Transactable transactable = i.next();
-		final int prepState =  transactable.prepare(this, space);
-		if (logger.isLoggable(Level.FINEST)) {
-		    logger.log(Level.FINEST, "prepare:prepared " +
-		        "transactable {0} for transaction mgr:{1}, id:{2}," +
-			" transactable now in state {3}", 
-			new Object[]{transactable, tr, Long.valueOf(trId), 
-				     TxnConstants.getName(prepState)});
-		}
-
-		switch (prepState) {
-		  case PREPARED:	     // has prepared state
-		    changed = true;	     // this means a change
-		    continue;
-
-		  case ABORTED:		     // has to abort
-		    abort(space);	     // abort this txn (does cleanup)
-		    state = ABORTED;
-		    return state;	     // vote aborted
-
-		  case NOTCHANGED:	     // no change
-		    i.remove();              // Won't need to call again
-		    continue;
-
-		  default:		     // huh?
-		    throw new
-			InternalSpaceException("prepare said " + prepState);
-		}
-	    }
-
-	    if (changed) {
-		state = PREPARED;
-		// have to watch this since it's now holding permanent 
-		// resources
-		space.monitor(Collections.nCopies(1, this));
-	    } else {
-		state = NOTCHANGED;
-	    }
-	    break;
-
-	  default:
-	    throw new IllegalStateException("unknown Txn state: " + state);
-	}
-
-	return state;
-    }
-
-    /**
-     * Abort the transaction.  This must be callable from
-     * <code>prepare</code> because if a <code>Transactable</code>
-     * votes <code>ABORT</code>, this method is called to make that
-     * happen. <code>makeInactive</code> must have been called on this
-     * transaction first.
-     */
-    synchronized void abort(OutriggerServerImpl space) {
-	assert stateChangeWaiting : "abort called before makeInactive";
-	assert stateReaders == 0 : "abort called before makeInactive completed";
-
-	if (logger.isLoggable(Level.FINER)) {
-	    logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
-		"state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	}
-
-	switch (state) {
-	  case ABORTED:		// already aborted
-	  case NOTCHANGED:	// nothing to abort
-	    break;
-
-	  case COMMITTED:	// "cannot happen"
-	    throw new IllegalStateException("aborting a committed txn");
-
-	  case ACTIVE:
-	  case PREPARED:
-	    if (logger.isLoggable(Level.FINEST)) {
-		logger.log(Level.FINEST, "abort:aborting transaction mgr:" +
-		    "{0}, id:{1}, state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	    }
-
-	    final Iterator<Transactable> i = txnables.iterator();
-	    while (i.hasNext()) {
-                i.next().abort(this, space);
-            }
-	    state = ABORTED;
-	    cleanup();
-	    break;
-
-	  default:
-	    throw new IllegalStateException("unknown Txn state: " + state);
-	}
-    }
-
-    /**
-     * Having prepared, roll the changes
-     * forward. <code>makeInactive</code> must have been called on
-     * this transaction first.
-     */
-    synchronized void commit(OutriggerServerImpl space) {
-	assert stateChangeWaiting : "commit called before makeInactive";
-	assert stateReaders == 0 : "commit called before makeInactive completed";
-
-	//!! Need to involve mgr here
-	if (logger.isLoggable(Level.FINER)) {
-	    logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
-		"state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	}
-
-	switch (state) {
-	  case ABORTED:		// "cannot happen" stuff
-	  case ACTIVE:
-	  case NOTCHANGED:
-	    throw new IllegalStateException("committing "
-		+ TxnConstants.getName(state) + " txn");
-
-	  case COMMITTED:	// previous committed, that's okay
-	    return;
-
-	  case PREPARED:	// voted PREPARED, time to finish up
-	    final Iterator<Transactable> i = txnables.iterator();
-	    if (logger.isLoggable(Level.FINEST)) {
-		logger.log(Level.FINEST, "commit:committing transaction mgr:" +
-		    "{0}, id:{1}, state:{2}", 
-		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
-	    }
-
-	    while (i.hasNext()) {
-                i.next().commit(this, space);
-            }
-	    state = COMMITTED;
-	    cleanup();
-	    return;
-
-	  default:
-	    throw new IllegalStateException("unknown Txn state: " + state);
-	}
-    }
-
-    /**
-     * Caution: see locking discussion at the class level.
-     */
-    public synchronized Transactable add(Transactable t) {
-	txnables.add(t);
-	return t;
-    }
-
-    // inherit doc comment
-    public ServerTransaction getTransaction(ProxyPreparer preparer)
-	throws IOException, ClassNotFoundException
-    {
-        synchronized (this){
-            if (tr == null) {
-                final TransactionManager mgr = 
-                    (TransactionManager)trm.get(preparer);
-                tr = new ServerTransaction(mgr, trId);
-            }
-            return tr;
-        }
-    }
-
-    /**
-     * Return the manager associated with this transaction.
-     * @return the manager associated with this transaction.
-     * @throws IllegalStateException if this <code>Txn</code>
-     *         is still broken.
-     */
-    synchronized TransactionManager getManager() {
-	if (tr == null)
-	    throw new IllegalStateException("Txn is still broken");
-	return tr.mgr;
-    }
-
-    /**
-     * Return the monitor task for this object. Note, this
-     * method is unsynchronized because it (and 
-     * <code>monitorTask(TxnMonitorTask)</code> are both called
-     * from the same thread.
-     */
-    TxnMonitorTask monitorTask() {
-	return monitorTask;
-    }
-
-    /**
-     * Set the monitor task for this object. Note, this method is
-     * unsynchronized because it (and <code>monitorTask()</code> are
-     * both called from the same thread.
-     */
-    void monitorTask(TxnMonitorTask task) {
-	monitorTask = task;
-    }
-
-    /**
-     * Clean up any state when the transaction is finished.
-     */
-    private void cleanup() {
-	if (monitorTask != null)
-	    monitorTask.cancel(false);	    }
-
-    // -----------------------------------
-    //  Methods required by StorableObject
-    // -----------------------------------
-
-    // inherit doc comment
-    public void store(ObjectOutputStream out) throws IOException {
-	/* There is a bunch of stuff we don't need to write. The
-	 * Txn id not stored since it is handed back during
-	 * recovery. The content is rebuilt txnables by the various
-	 * recoverWrite and recoverTake calls. state is not written
-	 * because it is always ACTIVE when we write, and always
-	 * needs to be PREPARED when we read it back.
-	 */
-        synchronized (this){
-            out.writeObject(trm);
-            out.writeLong(trId);
-        }
-    }
-
-    // inherit doc comment
-    public Txn restore(ObjectInputStream in) 
-	throws IOException, ClassNotFoundException 
-    {
-	/* Only transactions that got prepared and not committed or
-	 * aborted get recovered
-	 */
-        synchronized (this){
-            state    = PREPARED;
-            trm      = (StorableReference)in.readObject();
-            trId     = in.readLong();
-        }
-        return this;
-    }
-
-    @Override
-    public int compareTo(Txn o) {
-        if (o == null) return -1;
-        if (o.id < id) return -1;
-        if (o.id > id) return 1;
-        return 0;
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 7;
-        hash = 31 * hash + (int) (this.id ^ (this.id >>> 32));
-        return hash;
-    }
-    
-    public boolean equals(Object o){
-        if (!(o instanceof Txn)) return false;
-        Txn txn = (Txn) o;
-        return id == txn.id;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import org.apache.river.constants.TxnConstants;
+import org.apache.river.logging.Levels;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.rmi.RemoteException;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.transaction.CannotJoinException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+import net.jini.core.transaction.server.TransactionManager;
+
+import net.jini.space.InternalSpaceException;
+import net.jini.security.ProxyPreparer;
+import org.apache.river.outrigger.proxy.StorableObject;
+
+
+
+/**
+ * This class represents a space's state in a single transaction.
+ *
+ * Object of this class represent Jini transactions within outrigger.
+ * These transactions hold "Transactables" -- things that represent
+ * the actions that have been taken under this transaction. For example, 
+ * if this transaction were to be cancelled, the Transactables are
+ * examined and serve as the list of things to roll back in order to 
+ * restore the state of the Space status quo ante.
+ *
+ * This is achieved by having the transactables notified of state changes 
+ * to this transaction such as preparing, commit, etc. The 
+ * transactables themselves are responsible for "doing the right thing."
+ *
+ * NB: some--but not all--of the actions one can take with a transaction
+ * are managed internally by these objects. That is, all of the important
+ * methods objects of these types are synchronized. Therefore, two
+ * simultaneous calls to abort() and commit() are arbitrated properly. 
+ *
+ * However, care must be taken when add()ing a transactable. Even
+ * though the add() method is synchronized if you check the state of
+ * the transaction to ensure that is active and then call add() the
+ * transaction could have left the active state between the check and
+ * the add() call unless the call obtains the appropriate locks. This 
+ * is even more likely if other work needs to be done in addition to
+ * calling add() (e.g. persisting state, obtaining locks, etc.). The
+ * caller of add() should lock the associated transaction object and
+ * ensure that the transaction is still considered ACTIVE, do whatever
+ * work is necessary to complete while the transaction is in the ACTIVE
+ * state (including calling call add()) and then release the lock.
+ * This can be done by :
+ * <ul>
+ * <li> holding the lock on this object while checking the
+ *      state and carrying out the operation (including calling add()), or
+ * <li> calling ensureActive() to check the state
+ *      and obtain a non-exclusive lock, carrying out the operation
+ *      (including calling add()), and then calling allowStateChange() to
+ *      release the lock.
+ * </ul>
+ * The pair of ensureActive() and allowStateChange() allows for more
+ * concurrency if the operation is expected to take a long time, in
+ * that it will allow for other operations to be performed under the
+ * same transaction and let aborts prevent other operations from
+ * being started.
+ *
+ * @author Sun Microsystems, Inc.  
+ */
+class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> {
+
+    /** The internal id Outrigger as assigned to the transaction */
+    final private long id;
+
+    /** What state we think the transaction is in */
+    private volatile int state;
+
+    /** 
+     * The transaction manager associated with the transaction 
+     * this object is fronting for.
+     */
+    private StorableReference trm;
+
+    /**
+     * Cached <code>ServerTransaction</code> object for
+     * the transaction this object is fronting for.
+     */
+    private ServerTransaction tr;
+    
+    /** The id the transaction manager assigned to this transaction */
+    private volatile long  trId;
+
+    /** 
+     * The list of <code>Transactable</code> participating in 
+     * this transaction.
+     */
+    final private List<Transactable> txnables = new java.util.LinkedList<Transactable>();
+
+    /**
+     * The task responsible for monitoring to see if this
+     * transaction has been aborted with us being told, or
+     * null if no such task as been allocated.
+     */
+    private volatile TxnMonitorTask	monitorTask;
+
+    /** Count of number of threads holding a read lock on state */
+    private int stateReaders = 0;
+
+    /** 
+     * <code>true</code> if there is a blocked state change. Used
+     * to give writers priority.
+     */
+    private boolean stateChangeWaiting = false;
+
+    /** Logger for logging transaction related information */
+    private static final Logger logger = 
+	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+    /**
+     * Create a new <code>Txn</code> that represents our state in the
+     * given <code>ServerTransaction</code>.
+     */
+    Txn(ServerTransaction tr, long id) {
+	this(id);
+	trId = tr.id;
+	this.tr = tr;
+	this.trm = new StorableReference(tr.mgr);
+	state = ACTIVE;
+
+	if (logger.isLoggable(Level.FINER)) {
+	    logger.log(Level.FINER, "creating txn for transaction mgr:" +
+		    "{0}, id:{1}, state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	}
+    }
+
+    /** Used in recovery */
+    Txn(long id) {
+	this.id = id;		// the txn id is not persisted
+    }
+
+    /**
+     * Get the id for this txn. Note that this id is NOT the same as
+     * the ID of the transaction. Since that ID is not unique (it must
+     * be qualified with the <code>ServerTransaction</code> object) we create
+     * our own internal id to make txns unique. This is needed since we
+     * may not have the <code>Transaction</code> unmarshalled.
+     */
+    Long getId() {
+	return Long.valueOf(id);
+    }
+
+    /**
+     * We keep the transaction ID around because we may need it
+     * to identify a broken transaction after recovery.
+     */
+    long getTransactionId() {
+	return trId;
+    }
+
+    /**
+     * Return our local view of the current state. Need to be holding
+     * the lock on this object or have called <code>ensureActive</code>
+     * to get the current value.
+     */
+    int getState() {
+	return state;
+    }
+
+    /**
+     * Atomically checks that this transaction is in the active
+     * state and locks the transaction in the active state.
+     * The lock can be released by calling <code>allowStateChange</code>.
+     * Each call to this method should be paired with a call to
+     * <code>allowStateChange</code> in a finally block.
+     * @throws CannotJoinException if the transaction
+     * is not active or a state change is pending.  
+     */
+    synchronized void ensureActive() throws CannotJoinException {
+	if (state != ACTIVE || stateChangeWaiting) {
+	    final String msg = "transaction mgr:" + tr + ", id:" + trId +
+		" not active, in state " + TxnConstants.getName(state);
+	    final CannotJoinException e = new CannotJoinException(msg);
+	    logger.log(Levels.FAILED, msg, e);
+	    throw e; 
+	}
+	assert stateReaders >= 0;
+	stateReaders++;
+    }
+
+    /**
+     * Release the read lock created by an <code>ensureActive</code>
+     * call. Does nothing if the transaction is not active or there is
+     * a state change pending and thus is safe to call even if the
+     * corresponding <code>ensureActive</code> call threw
+     * <code>CannotJoinException</code>.  
+     */
+    synchronized void allowStateChange() {
+	if (state != ACTIVE || stateChangeWaiting)
+	    return;
+	stateReaders--;
+	assert stateReaders >= 0;
+	notifyAll();
+    }
+
+    /**
+     * Prevents new operations from being started under this
+     * transaction and blocks until in process operations are
+     * completed.
+     */
+    synchronized void makeInactive() {
+	stateChangeWaiting = true;
+	assert stateReaders >= 0;
+	while (stateReaders != 0) {
+	    try {
+		wait();
+	    } catch (InterruptedException e) {
+		throw new AssertionError(e);
+	    }
+	    assert stateReaders >= 0;
+	}
+    }
+
+    /**
+     * Prepare for transaction commit. <code>makeInactive</code> must have 
+     * been called on this transaction first.
+     */
+    synchronized int prepare(OutriggerServerImpl space) {
+	assert stateChangeWaiting : "prepare called before makeInactive";
+	assert stateReaders == 0 : "prepare called before makeInactive completed";
+
+	if (logger.isLoggable(Level.FINER)) {
+	    logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
+		"state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	}
+
+	switch (state) {
+	  case ABORTED:			       // previously aborted
+	    return ABORTED;
+
+	  case COMMITTED:		       // previously committed
+	    throw new IllegalStateException(); // "cannot happen"
+
+	  case NOTCHANGED:		       // previously voted NOTCHANGED
+	  case PREPARED:		       // previously voted PREPARED
+	    return state;		       // they are idempotent, and
+	                                       // and we have nothing to do
+	                                       // so return
+
+	  case ACTIVE:			       // currently active
+	    boolean changed = false;	       // did this txn change
+	                                       // anything?
+
+	    if (logger.isLoggable(Level.FINEST)) {
+		logger.log(Level.FINEST, "prepare:preparing transaction mgr:" +
+		    "{0}, id:{1}, state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	    }
+
+	    // loop through Transactable members of this Txn
+	    final Iterator<Transactable> i = txnables.iterator();
+	    int c=0; // Counter for debugging message
+	    while (i.hasNext()) {
+		// get this member's vote
+		final Transactable transactable = i.next();
+		final int prepState =  transactable.prepare(this, space);
+		if (logger.isLoggable(Level.FINEST)) {
+		    logger.log(Level.FINEST, "prepare:prepared " +
+		        "transactable {0} for transaction mgr:{1}, id:{2}," +
+			" transactable now in state {3}", 
+			new Object[]{transactable, tr, Long.valueOf(trId), 
+				     TxnConstants.getName(prepState)});
+		}
+
+		switch (prepState) {
+		  case PREPARED:	     // has prepared state
+		    changed = true;	     // this means a change
+		    continue;
+
+		  case ABORTED:		     // has to abort
+		    abort(space);	     // abort this txn (does cleanup)
+		    state = ABORTED;
+		    return state;	     // vote aborted
+
+		  case NOTCHANGED:	     // no change
+		    i.remove();              // Won't need to call again
+		    continue;
+
+		  default:		     // huh?
+		    throw new
+			InternalSpaceException("prepare said " + prepState);
+		}
+	    }
+
+	    if (changed) {
+		state = PREPARED;
+		// have to watch this since it's now holding permanent 
+		// resources
+		space.monitor(Collections.nCopies(1, this));
+	    } else {
+		state = NOTCHANGED;
+	    }
+	    break;
+
+	  default:
+	    throw new IllegalStateException("unknown Txn state: " + state);
+	}
+
+	return state;
+    }
+
+    /**
+     * Abort the transaction.  This must be callable from
+     * <code>prepare</code> because if a <code>Transactable</code>
+     * votes <code>ABORT</code>, this method is called to make that
+     * happen. <code>makeInactive</code> must have been called on this
+     * transaction first.
+     */
+    synchronized void abort(OutriggerServerImpl space) {
+	assert stateChangeWaiting : "abort called before makeInactive";
+	assert stateReaders == 0 : "abort called before makeInactive completed";
+
+	if (logger.isLoggable(Level.FINER)) {
+	    logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
+		"state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	}
+
+	switch (state) {
+	  case ABORTED:		// already aborted
+	  case NOTCHANGED:	// nothing to abort
+	    break;
+
+	  case COMMITTED:	// "cannot happen"
+	    throw new IllegalStateException("aborting a committed txn");
+
+	  case ACTIVE:
+	  case PREPARED:
+	    if (logger.isLoggable(Level.FINEST)) {
+		logger.log(Level.FINEST, "abort:aborting transaction mgr:" +
+		    "{0}, id:{1}, state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	    }
+
+	    final Iterator<Transactable> i = txnables.iterator();
+	    while (i.hasNext()) {
+                i.next().abort(this, space);
+            }
+	    state = ABORTED;
+	    cleanup();
+	    break;
+
+	  default:
+	    throw new IllegalStateException("unknown Txn state: " + state);
+	}
+    }
+
+    /**
+     * Having prepared, roll the changes
+     * forward. <code>makeInactive</code> must have been called on
+     * this transaction first.
+     */
+    synchronized void commit(OutriggerServerImpl space) {
+	assert stateChangeWaiting : "commit called before makeInactive";
+	assert stateReaders == 0 : "commit called before makeInactive completed";
+
+	//!! Need to involve mgr here
+	if (logger.isLoggable(Level.FINER)) {
+	    logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
+		"state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	}
+
+	switch (state) {
+	  case ABORTED:		// "cannot happen" stuff
+	  case ACTIVE:
+	  case NOTCHANGED:
+	    throw new IllegalStateException("committing "
+		+ TxnConstants.getName(state) + " txn");
+
+	  case COMMITTED:	// previous committed, that's okay
+	    return;
+
+	  case PREPARED:	// voted PREPARED, time to finish up
+	    final Iterator<Transactable> i = txnables.iterator();
+	    if (logger.isLoggable(Level.FINEST)) {
+		logger.log(Level.FINEST, "commit:committing transaction mgr:" +
+		    "{0}, id:{1}, state:{2}", 
+		new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+	    }
+
+	    while (i.hasNext()) {
+                i.next().commit(this, space);
+            }
+	    state = COMMITTED;
+	    cleanup();
+	    return;
+
+	  default:
+	    throw new IllegalStateException("unknown Txn state: " + state);
+	}
+    }
+
+    /**
+     * Caution: see locking discussion at the class level.
+     */
+    public synchronized Transactable add(Transactable t) {
+	txnables.add(t);
+	return t;
+    }
+
+    // inherit doc comment
+    public ServerTransaction getTransaction(ProxyPreparer preparer)
+	throws IOException, ClassNotFoundException
+    {
+        synchronized (this){
+            if (tr == null) {
+                final TransactionManager mgr = 
+                    (TransactionManager)trm.get(preparer);
+                tr = new ServerTransaction(mgr, trId);
+            }
+            return tr;
+        }
+    }
+
+    /**
+     * Return the manager associated with this transaction.
+     * @return the manager associated with this transaction.
+     * @throws IllegalStateException if this <code>Txn</code>
+     *         is still broken.
+     */
+    synchronized TransactionManager getManager() {
+	if (tr == null)
+	    throw new IllegalStateException("Txn is still broken");
+	return tr.mgr;
+    }
+
+    /**
+     * Return the monitor task for this object. Note, this
+     * method is unsynchronized because it (and 
+     * <code>monitorTask(TxnMonitorTask)</code> are both called
+     * from the same thread.
+     */
+    TxnMonitorTask monitorTask() {
+	return monitorTask;
+    }
+
+    /**
+     * Set the monitor task for this object. Note, this method is
+     * unsynchronized because it (and <code>monitorTask()</code> are
+     * both called from the same thread.
+     */
+    void monitorTask(TxnMonitorTask task) {
+	monitorTask = task;
+    }
+
+    /**
+     * Clean up any state when the transaction is finished.
+     */
+    private void cleanup() {
+	if (monitorTask != null)
+	    monitorTask.cancel(false);	    }
+
+    // -----------------------------------
+    //  Methods required by StorableObject
+    // -----------------------------------
+
+    // inherit doc comment
+    public void store(ObjectOutputStream out) throws IOException {
+	/* There is a bunch of stuff we don't need to write. The
+	 * Txn id not stored since it is handed back during
+	 * recovery. The content is rebuilt txnables by the various
+	 * recoverWrite and recoverTake calls. state is not written
+	 * because it is always ACTIVE when we write, and always
+	 * needs to be PREPARED when we read it back.
+	 */
+        synchronized (this){
+            out.writeObject(trm);
+            out.writeLong(trId);
+        }
+    }
+
+    // inherit doc comment
+    public Txn restore(ObjectInputStream in) 
+	throws IOException, ClassNotFoundException 
+    {
+	/* Only transactions that got prepared and not committed or
+	 * aborted get recovered
+	 */
+        synchronized (this){
+            state    = PREPARED;
+            trm      = (StorableReference)in.readObject();
+            trId     = in.readLong();
+        }
+        return this;
+    }
+
+    @Override
+    public int compareTo(Txn o) {
+        if (o == null) return -1;
+        if (o.id < id) return -1;
+        if (o.id > id) return 1;
+        return 0;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 31 * hash + (int) (this.id ^ (this.id >>> 32));
+        return hash;
+    }
+    
+    public boolean equals(Object o){
+        if (!(o instanceof Txn)) return false;
+        Txn txn = (Txn) o;
+        return id == txn.id;
+    }
+}

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java Sun Jul  5 11:41:39 2020
@@ -18,7 +18,7 @@
 package org.apache.river.outrigger;
 
 import org.apache.river.config.Config;
-import org.apache.river.thread.WakeupManager;
+import org.apache.river.thread.wakeup.WakeupManager;
 
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationException;

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java Sun Jul  5 11:41:39 2020
@@ -1,584 +1,584 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import org.apache.river.constants.TxnConstants;
-import org.apache.river.constants.ThrowableConstants;
-import org.apache.river.logging.Levels;
-import org.apache.river.thread.RetryTask;
-import org.apache.river.thread.WakeupManager;
-
-import java.io.IOException;
-import java.rmi.RemoteException;
-import java.rmi.UnmarshalException;
-import java.rmi.NoSuchObjectException;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.transaction.TransactionException;
-import net.jini.core.transaction.UnknownTransactionException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionConstants;
-
-/**
- * A task that will try to validate the state of a transaction.  This
- * uses weak references a good deal to let the other parts of the system
- * be GC'ed as necessary.
- * <p>
- * The retry mechanism is subtle, so bear with me.  The purpose is
- * to ensure that if any activity is being blocked by a given
- * transaction, that transaction will be tested at some point in
- * the future (if necessary, i.e., if it still is thought to be
- * active).  We assume it to be rare that a transactions that the
- * space thinks is active is, in fact, aborted, so the algorithm is
- * designed to guarantee the detection without a lot of overhead,
- * specifically without a lot of RMI calls.
- * <p>
- * Each task has three values: a <code>nextQuery</code> time, a
- * <code>mustQuery</code> boolean that force the next query to be
- * made, and <code>deltaT</code>, the time at which the following
- * query will be scheduled.  When the task is awakened at its
- * <code>nextQuery</code> time, it checks to see if it must make an
- * actual query to the transaction manager, which it will do if either
- * <code>mustQuery</code> is <code>true</code>, or if we know about
- * any in progress queries on the space that are blocked on the
- * transaction.  Whether or not an actual query is made,
- * <code>deltaT</code> is added to <code>nextQuery</code> to get the
- * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
- * <code>mustQuery</code> boolean is set to <code>false</code>.
- * <p>
- * There are two kinds of requests that a with which transaction
- * can cause a conflict -- those with long timeouts (such as
- * blocking reads and takes) and those that are under short timeouts
- * (such as reads and takes with zero-length timeouts).  We will
- * treat them separately at several points of the algorithm.  A
- * short timeout is any query whose expiration time is sooner than
- * the <code>nextQuery</code> time.  Any other timeout is long
- * If a short query arrives, <code>mustQuery</code> is set to 
- * <code>true</code>.
- * <p>
- * The result is that any time a transaction causes a conflict, if
- * the query on the space has not ended by the time of the
- * <code>nextQuery</code> we will attempt to poll the transaction manager.  
- * There will also poll the transaction manager if any conflict occurred
- * on a query on the space with a short timeout.
- * <p>
- * The first time a transaction causes a conflict, we schedule a
- * time in the future at which we will poll its status.  We do not
- * poll right away because often a transaction will complete on
- * its own before we get to that time, making the check
- * unnecessary.  An instant poll is, therefore, unnecessarily
- * aggressive, since giving an initial grace time will usually mean
- * no poll is made at all.  So if the first conflict occurs at
- * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
- * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
- * will be <code>true</code> to force that poll to happen, and
- * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see TxnMonitor 
- */
-class TxnMonitorTask extends RetryTask
-    implements TransactionConstants, org.apache.river.constants.TimeConstants
-{
-    /** transaction being monitored */
-    private final Txn	txn;
-
-    /** the monitor we were made by */
-    private final TxnMonitor monitor; 
-
-    /**
-     * All the queries on the space (not queries to the transaction
-     * manager) waiting for <code>txn</code> to be resolved.
-     * <code>null</code> until we have at least one. Represented by
-     * <code>QueryWatcher</code> objects.  
-     */
-    private Map<QueryWatcher,Collection<Txn>>		queries; //Sync on this.
-
-    /** count of RemoteExceptions */
-    private final AtomicInteger		failCnt;
-
-    /** 
-     * The next time we need to poll the transaction manager 
-     * to get <code>txn</code>'s actual state.
-     */
-    private final AtomicLong	nextQuery;
-
-    /**
-     * When we're given an opportunity to poll the transaction manager
-     * for the <code>txn</code>'s state, do so.
-     */
-    private volatile boolean	mustQuery;
-
-    /** next value added to <code>nextQuery</code> */
-    private volatile long	deltaT;
-
-    /**
-     * The initial grace period before the first query.
-     */
-    private static final long	INITIAL_GRACE = 15 * SECONDS;
-
-    /**
-     * The retry time when we have an encountered an exception
-     */
-    private static final long	BETWEEN_EXCEPTIONS = 15 * SECONDS;
-
-    /**
-     * The largest value that <code>deltaT</code> will reach.
-     */
-    private static final long	MAX_DELTA_T = 1 * HOURS;
-
-    /**
-     * The maximum number of failures allowed in a row before we simply
-     * give up on the transaction and consider it aborted.
-     */
-    private static final int	MAX_FAILURES = 3;
-
-    /** Logger for logging transaction related information */
-    private static final Logger logger = 
-	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
-    /**
-     * Create a new TxnMonitorTask.
-     */
-    TxnMonitorTask(Txn txn, TxnMonitor monitor,
-		   ExecutorService manager, WakeupManager wakeupMgr) {
-	super(manager, wakeupMgr);
-	this.txn = txn;
-	this.monitor = monitor;
-	nextQuery = new AtomicLong(startTime());	// retryTime will add INITIAL_GRACE
-	deltaT = INITIAL_GRACE;
-	mustQuery = true;
-        failCnt = new AtomicInteger();
-    }
-
-    /**
-     * Return the time of the next query, bumping <code>deltaT</code> as
-     * necessary for the next iteration.  If the transaction has voted
-     * <code>PREPARED</code> or the manager has been giving us a
-     * <code>RemoteException</code>, we should retry on short times;
-     * otherwise we back off quickly.
-     */
-    public long retryTime() {
-        boolean noFailures = false;
-        synchronized (txn){
-            noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED);
-        }
-            if (noFailures) {      // no failures
-                if (logger.isLoggable(Level.FINEST)) {
-                    logger.log(Level.FINEST, "{0} retryTime adds {1}", 
-                               new Object[]{this, Long.valueOf(deltaT)});
-                }
-
-                nextQuery.addAndGet(deltaT);
-                synchronized (this){
-                    if (deltaT < MAX_DELTA_T)
-                        deltaT = Math.min(deltaT * 2, MAX_DELTA_T);
-                }
-            } else {
-                if (logger.isLoggable(Level.FINEST)) {
-                    logger.log(Level.FINEST, "{0} retryTime adds {1} (for {2})", 
-                               new Object[]{this, Long.valueOf(BETWEEN_EXCEPTIONS), 
-                                   (failCnt.get() != 0 ? "failure" : "PREPARED")});
-                }
-                nextQuery.addAndGet(BETWEEN_EXCEPTIONS);
-            }
-        
-	return nextQuery.get();
-    }
-
-    /**
-     * Add a ``sibling'' transaction, one that is now blocking progress
-     * on one of the same entries.  For example, if a client is blocked
-     * on a <code>read</code>, another transaction can read the same
-     * entry, thereby also blocking that same client.  This means that
-     * the transaction for the second <code>read</code> must be
-     * watched, too.  The list of queries for the second transaction
-     * might be less that the list of those in this transaction, but
-     * the process of figuring out the subset is too expensive, since
-     * we have tried to make the checking process itself cheap,
-     * anyway.  So we add all queries this task is currently monitoring
-     * to the task monitoring the second transaction.  If there are
-     * no queries, then the blocking occurred because of a short query
-     * or all the queries have expired, in which case the second transaction
-     * isn't blocking the way of anything currently, so this method does
-     * nothing.
-     * <p>
-     * Of course, in order to avoid blocking the thread that is calling
-     * this (which is trying to perform a <code>read</code>, after
-     * all), we simply add each lease in this task to the monitor's
-     * queue.
-     *
-     */
-    // @see TxnEntryHandle#monitor
-    //!! Would it be worth the overhead to make TxnEntryHandle.monitor
-    //!! search for the transaction with the smallest set of leases?  -arnold
-    synchronized void addSibling(Txn txn) {
-	if (queries == null || queries.size() == 0)
-	    return;
-	Collection<Txn> sibling = Collections.nCopies(1, txn);
-	Iterator<QueryWatcher> it = queries.keySet().iterator();
-	while (it.hasNext()) {
-	    QueryWatcher query = it.next();
-	    if (query != null)	// from a weak map, so might be null
-		monitor.add(query, sibling);
-	}
-    }
-
-    /**
-     * Try to see if this transaction should be aborted.  This returns
-     * <code>true</code> (don't repeat the task) if it knows that the
-     * transaction is no longer interesting to anyone.
-     */
-    public boolean tryOnce() {
-	if (logger.isLoggable(Level.FINEST)) {
-	    logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}", 
-	        new Object[]{this, Integer.valueOf(attempt()), 
-			     Boolean.valueOf(mustQuery) });
-	}
-
-	/*
-	 * The first time we do nothing, since RetryTask invokes run first,
-	 * but we want to wait a bit before testing the transaction.
-	 */
-	if (attempt() == 0)
-	    return false;
-        int txnState;
-        synchronized (txn){
-            txnState = txn.getState();
-        }
-	if (logger.isLoggable(Level.FINEST)) {
-	    logger.log(Level.FINEST, "{0} txn.getState() = {1}", 
-	        new Object[]{this, Integer.valueOf(txnState)});
-	}
-
-	// not active or prepared == no longer blocking
-	
-	if (txnState != ACTIVE && txnState != PREPARED)
-	    return true;
-
-	// if we're prepared, test every time -- this shouldn't take long
-	mustQuery |= (txnState == PREPARED);
-
-	/*
-	 * Go through the resources to see if we can find one still active
-	 * that cares.  Must be synchronized since we test, then clear --
-	 * another thread that set the flag between the test and clear
-	 * would have its requirements lost.
-	 */
-	synchronized (this) {
-	    if (!mustQuery) {		// then try resources
-		if (queries == null)	// no resources, so nobody wants it
-		    return false;	// try again next time
-
-		Iterator<QueryWatcher> it = queries.keySet().iterator();
-		boolean foundNeed = false;
-
-		if (logger.isLoggable(Level.FINEST)) {
-		    logger.log(Level.FINEST, "{0} nextQuery {1}", 
-			       new Object[]{this, nextQuery});
-		}
-
-		while (it.hasNext()) {
-		    QueryWatcher query = it.next();
-		    if (query == null)     // gone -- the map will reap it
-			continue;
-		    if (logger.isLoggable(Level.FINEST)) {
-			logger.log(Level.FINEST, 
-				   "{0} query.getExpiration() {1}", 
-				   new Object[]{this, 
-				       Long.valueOf(query.getExpiration())});
-		    }
-
-		    if (query.getExpiration() < nextQuery.get() || query.isResolved()) {
-                        it.remove();
-                    }	// expired, so we don't care about it
-		    else {
-			foundNeed = true;
-			break;
-		    }
-		}
-
-		if (logger.isLoggable(Level.FINEST)) {
-		    logger.log(Level.FINEST, "{0} foundNeed = {1}", 
-			       new Object[]{this, Boolean.valueOf(foundNeed)});
-		}
-
-		if (!foundNeed)		// nobody wants it
-		    return false;	// try again next time
-	    }
-	    mustQuery = false;		// clear it for next time
-	}
-
-	/*
-	 * Now we know (a) the transaction itself is alive, and (b) some
-	 * lease still cares.  Make sure it's still active as far as the
-	 * it knows, and if it is, then ask the manager about it.
-	 */
-	ServerTransaction tr;
-	try {
-	    /* This may fix a broken Txn, if it does it won't get moved
-	     * from the broken to the unbroken list. It will get
-	     * moved eventually, but it does seem unfortunate it does
-	     * not happen immediately
-	     */
-	    tr = txn.getTransaction(
-		monitor.space().getRecoveredTransactionManagerPreparer());
-	} catch (RemoteException e) {
-	    final int cat = ThrowableConstants.retryable(e);
-
-	    if (cat == ThrowableConstants.BAD_INVOCATION ||
-		cat == ThrowableConstants.BAD_OBJECT)
-	    {
-		// Not likely to get better, give up
-		logUnpackingFailure("definite exception", Level.INFO,
-				    true, e);				    
-		return true;
-	    } else if (cat == ThrowableConstants.INDEFINITE) {
-		// try, try, again
-		logUnpackingFailure("indefinite exception", Levels.FAILED,
-				    false, e);				    
-		mustQuery = true;
-		return false;
-	    } else if (cat == ThrowableConstants.UNCATEGORIZED) {
-		// Same as above but log differently.
-		mustQuery = true;
-		logUnpackingFailure("uncategorized exception", Level.INFO,
-				    false, e);				    
-		return false;
-	    } else {
-		logger.log(Level.WARNING, "ThrowableConstants.retryable " +
-			   "returned out of range value, " + cat,
-			   new AssertionError(e));
-		return false;
-	    }
-	} catch (IOException e) {
-	    // Not likely to get better
-	    logUnpackingFailure("IOException", Level.INFO, true, e);
-	    return true;
-	} catch (RuntimeException e) {
-	    // Not likely to get better
-	    logUnpackingFailure("RuntimeException", Level.INFO, true, e);
-	    return true;
-	} catch (ClassNotFoundException e) {
-	    // codebase probably down, keep trying
-	    logUnpackingFailure("ClassNotFoundException", Levels.FAILED, 
-				false, e);
-	    mustQuery = true;
-	    return false;
-	}
-
-	if (logger.isLoggable(Level.FINEST))
-	    logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr});
-
-	int trState;
-	try {
-	    trState = tr.getState();
-	} catch (TransactionException e) {
-	    if (logger.isLoggable(Level.INFO))
-		logger.log(Level.INFO, "Got TransactionException when " +
-		    "calling getState on " + tr + ", dropping transaction " +
-		    tr.id, e);
-	    trState = ABORTED;
-	} catch (NoSuchObjectException e) {
-	    /* It would be epsilon better to to give up immediately
-	     * if we get a NoSuchObjectException and we are in the
-	     * active state, however, the code to do this would
-	     * be very complicated since we need to hold a lock to
-	     * while reading and acting on the state.
-	     */
-	    if (failCnt.incrementAndGet() >= MAX_FAILURES) {
-		if (logger.isLoggable(Level.INFO)) {
-		    logger.log(Level.INFO, "Got NoSuchObjectException when " +
-			"calling getState on " + tr + ", this was the " +
-			failCnt + " RemoteException, dropping transaction" +
-			tr.id, e);
-		}
-		trState = ABORTED;
-	    } else {
-		if (logger.isLoggable(Levels.FAILED)) {
-		    logger.log(Levels.FAILED, "Got NoSuchObjectException " +
-			"when calling getState on " + tr + ", failCount = " +
-			failCnt + ", will retry", e);
-		}
-		mustQuery = true;      // keep on trying
-		return false;	       // try again next time
-	    }
-	} catch (RemoteException e) {
-	    if (failCnt.incrementAndGet() >= MAX_FAILURES) {
-		/* abort if we are not prepared and not already 
-		 * aborted. If prepared retry, otherwise
-		 * we're done. Check state and make any abort() call
-		 * atomically so we can't accidently abort a prepared
-		 * transaction.
-		 */
-		synchronized (txn) {
-		    switch (txn.getState()) {
-		      case ACTIVE:
-			// Safe to abort, give up
-			if (logger.isLoggable(Level.INFO)) {
-			    logger.log(Level.INFO, "Got RemoteException " +
-			        "when calling getState on " + tr + ", this " +
-                                "was " + failCnt + " RemoteException, " +
-				"dropping active transaction " + tr.id, e);
-			}
-
-			try {
-			    monitor.space().abort(tr.mgr, tr.id);
-			    return true;
-			} catch (UnknownTransactionException ute) {
-			    throw new AssertionError(ute);
-			} catch (UnmarshalException ume) {
-			    throw new AssertionError(ume);
-			}
-		      case PREPARED:
-		        final Level l = (failCnt.get()%MAX_FAILURES == 0)?
-			    Level.INFO:Levels.FAILED;
-			if (logger.isLoggable(l)) {
-			    logger.log(l, "Got RemoteException when calling " +
-				"getState on " + tr + ", this was " + 
-				failCnt + " RemoteException, will keep " +
-				"prepared transaction " + tr.id, e);
-			}
-
-			// Can't give up, keep on trying to find real state
-			mustQuery = true;
-			return false;
-	 	      case ABORTED:
-		      case COMMITTED:
-			// done
-			return true;
-		      default:
-			throw new AssertionError("Txn in unreachable state");
-		    }
-		}
-	    } else {
-		// Don't know, but not ready to give up
-		if (logger.isLoggable(Levels.FAILED)) {
-		    logger.log(Levels.FAILED, "Got RemoteException when " +
-			"calling getState on " + tr + ", failCount = " +
-			failCnt + ", will retry", e);
-		}
-
-		mustQuery = true;      // keep on trying
-		return false;	       // try again next time
-	    }
-	}
-    
-	if (logger.isLoggable(Level.FINER)) {
-	    logger.log(Level.FINER, "{0} trState = {1}", 
-		       new Object[]{this, Integer.valueOf(trState)});
-	}
-
-	failCnt.set(0);		       // reset failures -- we got a response
-
-	/*
-	 * If the two states aren't the same, the state changed and we
-	 * need to account for that locally here by calling the method
-	 * that would make the change (the one we should have gotten.
-	 * (We use the external forms of abort, commit, etc., because
-	 * they are what the manager would call, and therefore these
-	 * calls will always do exactly what the incoming manager
-	 * calls would have done.  I don't want this to be fragile by
-	 * bypassing those calls and going straight to the Txn
-	 * object's calls, which might skip something important in the
-	 * OutriggerServerImpl calls).
-	 */
-
-	if (trState != txnState) {
-	    if (logger.isLoggable(Level.FINER)) {
-		logger.log(Level.FINER, 
-		    "{0} mgr state[{1}] != local state [{2}]", 
-		    new Object[]{this,
-				 TxnConstants.getName(trState),
-				 TxnConstants.getName(txnState)});
-	    }
-
-	    try {
-		switch (trState) {
-		  case ABORTED:
-		    logger.log(Level.FINER, "{0} moving to abort", this);
-			  
-		    monitor.space().abort(tr.mgr, tr.id);
-		    return true;
-
-		  case COMMITTED:
-		    logger.log(Level.FINER, "{0} moving to commit", this);
-
-		    monitor.space().commit(tr.mgr, tr.id);
-		    return true;
-		}
-	    } catch (UnknownTransactionException e) {
-		// we must somehow have already gotten the abort() or
-		// commit(), and have therefore forgotten about the
-		// transaction, while this code was executing
-		return true;
-	    } catch (UnmarshalException ume) {
-		throw new AssertionError(ume);
-	    }
-
-	    // we can't fake anything else -- the manager will have to call
-	    // us
-	}
-
-	logger.log(Level.FINEST, "{0} return false", this);
-
-	return false;			// now we know so nothing more to do
-    }
-
-    /**
-     * Add in a resource.  The lease may already be in, in which case it is
-     * ignored, or it may be null, in which case it was a non-leased probe
-     * that was blocked and we simply set <code>mustQuery</code> to
-     * <code>true</code>.
-     */
-    synchronized void add(QueryWatcher query) {
-	if (query == null || query.getExpiration() <= nextQuery.get()) {
-	    if (logger.isLoggable(Level.FINEST))
-		logger.log(Level.FINEST, "adding resource to task -- SHORT");
-	    mustQuery = true;
-	} else {
-	    if (logger.isLoggable(Level.FINEST))
-		logger.log(Level.FINEST, "adding resource to task -- LONG");
-	    if (queries == null)
-		queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we use it like a WeakHashSet
-	    queries.put(query, null);
-	}
-    }
-
-    /** Log failed unpacking attempt attempt */
-    private void logUnpackingFailure(String exceptionDescription, Level level,
-				     boolean terminal, Throwable t) 
-    {
-	if (logger.isLoggable(level)) {
-	    logger.log(level, "Encountered " + exceptionDescription +
-		"while unpacking exception to check state, " +
-		(terminal?"dropping":"keeping") +  " monitoring task", t);
-	}
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import org.apache.river.constants.TxnConstants;
+import org.apache.river.constants.ThrowableConstants;
+import org.apache.river.logging.Levels;
+import org.apache.river.thread.wakeup.RetryTask;
+import org.apache.river.thread.wakeup.WakeupManager;
+
+import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.UnmarshalException;
+import java.rmi.NoSuchObjectException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.transaction.TransactionException;
+import net.jini.core.transaction.UnknownTransactionException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+
+/**
+ * A task that will try to validate the state of a transaction.  This
+ * uses weak references a good deal to let the other parts of the system
+ * be GC'ed as necessary.
+ * <p>
+ * The retry mechanism is subtle, so bear with me.  The purpose is
+ * to ensure that if any activity is being blocked by a given
+ * transaction, that transaction will be tested at some point in
+ * the future (if necessary, i.e., if it still is thought to be
+ * active).  We assume it to be rare that a transactions that the
+ * space thinks is active is, in fact, aborted, so the algorithm is
+ * designed to guarantee the detection without a lot of overhead,
+ * specifically without a lot of RMI calls.
+ * <p>
+ * Each task has three values: a <code>nextQuery</code> time, a
+ * <code>mustQuery</code> boolean that force the next query to be
+ * made, and <code>deltaT</code>, the time at which the following
+ * query will be scheduled.  When the task is awakened at its
+ * <code>nextQuery</code> time, it checks to see if it must make an
+ * actual query to the transaction manager, which it will do if either
+ * <code>mustQuery</code> is <code>true</code>, or if we know about
+ * any in progress queries on the space that are blocked on the
+ * transaction.  Whether or not an actual query is made,
+ * <code>deltaT</code> is added to <code>nextQuery</code> to get the
+ * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
+ * <code>mustQuery</code> boolean is set to <code>false</code>.
+ * <p>
+ * There are two kinds of requests that a with which transaction
+ * can cause a conflict -- those with long timeouts (such as
+ * blocking reads and takes) and those that are under short timeouts
+ * (such as reads and takes with zero-length timeouts).  We will
+ * treat them separately at several points of the algorithm.  A
+ * short timeout is any query whose expiration time is sooner than
+ * the <code>nextQuery</code> time.  Any other timeout is long
+ * If a short query arrives, <code>mustQuery</code> is set to 
+ * <code>true</code>.
+ * <p>
+ * The result is that any time a transaction causes a conflict, if
+ * the query on the space has not ended by the time of the
+ * <code>nextQuery</code> we will attempt to poll the transaction manager.  
+ * There will also poll the transaction manager if any conflict occurred
+ * on a query on the space with a short timeout.
+ * <p>
+ * The first time a transaction causes a conflict, we schedule a
+ * time in the future at which we will poll its status.  We do not
+ * poll right away because often a transaction will complete on
+ * its own before we get to that time, making the check
+ * unnecessary.  An instant poll is, therefore, unnecessarily
+ * aggressive, since giving an initial grace time will usually mean
+ * no poll is made at all.  So if the first conflict occurs at
+ * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
+ * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
+ * will be <code>true</code> to force that poll to happen, and
+ * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see TxnMonitor 
+ */
+class TxnMonitorTask extends RetryTask
+    implements TransactionConstants, org.apache.river.constants.TimeConstants
+{
+    /** transaction being monitored */
+    private final Txn	txn;
+
+    /** the monitor we were made by */
+    private final TxnMonitor monitor; 
+
+    /**
+     * All the queries on the space (not queries to the transaction
+     * manager) waiting for <code>txn</code> to be resolved.
+     * <code>null</code> until we have at least one. Represented by
+     * <code>QueryWatcher</code> objects.  
+     */
+    private Map<QueryWatcher,Collection<Txn>>		queries; //Sync on this.
+
+    /** count of RemoteExceptions */
+    private final AtomicInteger		failCnt;
+
+    /** 
+     * The next time we need to poll the transaction manager 
+     * to get <code>txn</code>'s actual state.
+     */
+    private final AtomicLong	nextQuery;
+
+    /**
+     * When we're given an opportunity to poll the transaction manager
+     * for the <code>txn</code>'s state, do so.
+     */
+    private volatile boolean	mustQuery;
+
+    /** next value added to <code>nextQuery</code> */
+    private volatile long	deltaT;
+
+    /**
+     * The initial grace period before the first query.
+     */
+    private static final long	INITIAL_GRACE = 15 * SECONDS;
+
+    /**
+     * The retry time when we have an encountered an exception
+     */
+    private static final long	BETWEEN_EXCEPTIONS = 15 * SECONDS;
+
+    /**
+     * The largest value that <code>deltaT</code> will reach.
+     */
+    private static final long	MAX_DELTA_T = 1 * HOURS;
+
+    /**
+     * The maximum number of failures allowed in a row before we simply
+     * give up on the transaction and consider it aborted.
+     */
+    private static final int	MAX_FAILURES = 3;
+
+    /** Logger for logging transaction related information */
+    private static final Logger logger = 
+	Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+    /**
+     * Create a new TxnMonitorTask.
+     */
+    TxnMonitorTask(Txn txn, TxnMonitor monitor,
+		   ExecutorService manager, WakeupManager wakeupMgr) {
+	super(manager, wakeupMgr);
+	this.txn = txn;
+	this.monitor = monitor;
+	nextQuery = new AtomicLong(startTime());	// retryTime will add INITIAL_GRACE
+	deltaT = INITIAL_GRACE;
+	mustQuery = true;
+        failCnt = new AtomicInteger();
+    }
+
+    /**
+     * Return the time of the next query, bumping <code>deltaT</code> as
+     * necessary for the next iteration.  If the transaction has voted
+     * <code>PREPARED</code> or the manager has been giving us a
+     * <code>RemoteException</code>, we should retry on short times;
+     * otherwise we back off quickly.
+     */
+    public long retryTime() {
+        boolean noFailures = false;
+        synchronized (txn){
+            noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED);
+        }
+            if (noFailures) {      // no failures
+                if (logger.isLoggable(Level.FINEST)) {
+                    logger.log(Level.FINEST, "{0} retryTime adds {1}", 
+                               new Object[]{this, Long.valueOf(deltaT)});
+                }
+
+                nextQuery.addAndGet(deltaT);
+                synchronized (this){
+                    if (deltaT < MAX_DELTA_T)
+                        deltaT = Math.min(deltaT * 2, MAX_DELTA_T);
+                }
+            } else {
+                if (logger.isLoggable(Level.FINEST)) {
+                    logger.log(Level.FINEST, "{0} retryTime adds {1} (for {2})", 
+                               new Object[]{this, Long.valueOf(BETWEEN_EXCEPTIONS), 
+                                   (failCnt.get() != 0 ? "failure" : "PREPARED")});
+                }
+                nextQuery.addAndGet(BETWEEN_EXCEPTIONS);
+            }
+        
+	return nextQuery.get();
+    }
+
+    /**
+     * Add a ``sibling'' transaction, one that is now blocking progress
+     * on one of the same entries.  For example, if a client is blocked
+     * on a <code>read</code>, another transaction can read the same
+     * entry, thereby also blocking that same client.  This means that
+     * the transaction for the second <code>read</code> must be
+     * watched, too.  The list of queries for the second transaction
+     * might be less that the list of those in this transaction, but
+     * the process of figuring out the subset is too expensive, since
+     * we have tried to make the checking process itself cheap,
+     * anyway.  So we add all queries this task is currently monitoring
+     * to the task monitoring the second transaction.  If there are
+     * no queries, then the blocking occurred because of a short query
+     * or all the queries have expired, in which case the second transaction
+     * isn't blocking the way of anything currently, so this method does
+     * nothing.
+     * <p>
+     * Of course, in order to avoid blocking the thread that is calling
+     * this (which is trying to perform a <code>read</code>, after
+     * all), we simply add each lease in this task to the monitor's
+     * queue.
+     *
+     */
+    // @see TxnEntryHandle#monitor
+    //!! Would it be worth the overhead to make TxnEntryHandle.monitor
+    //!! search for the transaction with the smallest set of leases?  -arnold
+    synchronized void addSibling(Txn txn) {
+	if (queries == null || queries.size() == 0)
+	    return;
+	Collection<Txn> sibling = Collections.nCopies(1, txn);
+	Iterator<QueryWatcher> it = queries.keySet().iterator();
+	while (it.hasNext()) {
+	    QueryWatcher query = it.next();
+	    if (query != null)	// from a weak map, so might be null
+		monitor.add(query, sibling);
+	}
+    }
+
+    /**
+     * Try to see if this transaction should be aborted.  This returns
+     * <code>true</code> (don't repeat the task) if it knows that the
+     * transaction is no longer interesting to anyone.
+     */
+    public boolean tryOnce() {
+	if (logger.isLoggable(Level.FINEST)) {
+	    logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}", 
+	        new Object[]{this, Integer.valueOf(attempt()), 
+			     Boolean.valueOf(mustQuery) });
+	}
+
+	/*
+	 * The first time we do nothing, since RetryTask invokes run first,
+	 * but we want to wait a bit before testing the transaction.
+	 */
+	if (attempt() == 0)
+	    return false;
+        int txnState;
+        synchronized (txn){
+            txnState = txn.getState();
+        }
+	if (logger.isLoggable(Level.FINEST)) {
+	    logger.log(Level.FINEST, "{0} txn.getState() = {1}", 
+	        new Object[]{this, Integer.valueOf(txnState)});
+	}
+
+	// not active or prepared == no longer blocking
+	
+	if (txnState != ACTIVE && txnState != PREPARED)
+	    return true;
+
+	// if we're prepared, test every time -- this shouldn't take long
+	mustQuery |= (txnState == PREPARED);
+
+	/*
+	 * Go through the resources to see if we can find one still active
+	 * that cares.  Must be synchronized since we test, then clear --
+	 * another thread that set the flag between the test and clear
+	 * would have its requirements lost.
+	 */
+	synchronized (this) {
+	    if (!mustQuery) {		// then try resources
+		if (queries == null)	// no resources, so nobody wants it
+		    return false;	// try again next time
+
+		Iterator<QueryWatcher> it = queries.keySet().iterator();
+		boolean foundNeed = false;
+
+		if (logger.isLoggable(Level.FINEST)) {
+		    logger.log(Level.FINEST, "{0} nextQuery {1}", 
+			       new Object[]{this, nextQuery});
+		}
+
+		while (it.hasNext()) {
+		    QueryWatcher query = it.next();
+		    if (query == null)     // gone -- the map will reap it
+			continue;
+		    if (logger.isLoggable(Level.FINEST)) {
+			logger.log(Level.FINEST, 
+				   "{0} query.getExpiration() {1}", 
+				   new Object[]{this, 
+				       Long.valueOf(query.getExpiration())});
+		    }
+
+		    if (query.getExpiration() < nextQuery.get() || query.isResolved()) {
+                        it.remove();
+                    }	// expired, so we don't care about it
+		    else {
+			foundNeed = true;
+			break;
+		    }
+		}
+
+		if (logger.isLoggable(Level.FINEST)) {
+		    logger.log(Level.FINEST, "{0} foundNeed = {1}", 
+			       new Object[]{this, Boolean.valueOf(foundNeed)});
+		}
+
+		if (!foundNeed)		// nobody wants it
+		    return false;	// try again next time
+	    }
+	    mustQuery = false;		// clear it for next time
+	}
+
+	/*
+	 * Now we know (a) the transaction itself is alive, and (b) some
+	 * lease still cares.  Make sure it's still active as far as the
+	 * it knows, and if it is, then ask the manager about it.
+	 */
+	ServerTransaction tr;
+	try {
+	    /* This may fix a broken Txn, if it does it won't get moved
+	     * from the broken to the unbroken list. It will get
+	     * moved eventually, but it does seem unfortunate it does
+	     * not happen immediately
+	     */
+	    tr = txn.getTransaction(
+		monitor.space().getRecoveredTransactionManagerPreparer());
+	} catch (RemoteException e) {
+	    final int cat = ThrowableConstants.retryable(e);
+
+	    if (cat == ThrowableConstants.BAD_INVOCATION ||
+		cat == ThrowableConstants.BAD_OBJECT)
+	    {
+		// Not likely to get better, give up
+		logUnpackingFailure("definite exception", Level.INFO,
+				    true, e);				    
+		return true;
+	    } else if (cat == ThrowableConstants.INDEFINITE) {
+		// try, try, again
+		logUnpackingFailure("indefinite exception", Levels.FAILED,
+				    false, e);				    
+		mustQuery = true;
+		return false;
+	    } else if (cat == ThrowableConstants.UNCATEGORIZED) {
+		// Same as above but log differently.
+		mustQuery = true;
+		logUnpackingFailure("uncategorized exception", Level.INFO,
+				    false, e);				    
+		return false;
+	    } else {
+		logger.log(Level.WARNING, "ThrowableConstants.retryable " +
+			   "returned out of range value, " + cat,
+			   new AssertionError(e));
+		return false;
+	    }
+	} catch (IOException e) {
+	    // Not likely to get better
+	    logUnpackingFailure("IOException", Level.INFO, true, e);
+	    return true;
+	} catch (RuntimeException e) {
+	    // Not likely to get better
+	    logUnpackingFailure("RuntimeException", Level.INFO, true, e);
+	    return true;
+	} catch (ClassNotFoundException e) {
+	    // codebase probably down, keep trying
+	    logUnpackingFailure("ClassNotFoundException", Levels.FAILED, 
+				false, e);
+	    mustQuery = true;
+	    return false;
+	}
+
+	if (logger.isLoggable(Level.FINEST))
+	    logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr});
+
+	int trState;
+	try {
+	    trState = tr.getState();
+	} catch (TransactionException e) {
+	    if (logger.isLoggable(Level.INFO))
+		logger.log(Level.INFO, "Got TransactionException when " +
+		    "calling getState on " + tr + ", dropping transaction " +
+		    tr.id, e);
+	    trState = ABORTED;
+	} catch (NoSuchObjectException e) {
+	    /* It would be epsilon better to to give up immediately
+	     * if we get a NoSuchObjectException and we are in the
+	     * active state, however, the code to do this would
+	     * be very complicated since we need to hold a lock to
+	     * while reading and acting on the state.
+	     */
+	    if (failCnt.incrementAndGet() >= MAX_FAILURES) {
+		if (logger.isLoggable(Level.INFO)) {
+		    logger.log(Level.INFO, "Got NoSuchObjectException when " +
+			"calling getState on " + tr + ", this was the " +
+			failCnt + " RemoteException, dropping transaction" +
+			tr.id, e);
+		}
+		trState = ABORTED;
+	    } else {
+		if (logger.isLoggable(Levels.FAILED)) {
+		    logger.log(Levels.FAILED, "Got NoSuchObjectException " +
+			"when calling getState on " + tr + ", failCount = " +
+			failCnt + ", will retry", e);
+		}
+		mustQuery = true;      // keep on trying
+		return false;	       // try again next time
+	    }
+	} catch (RemoteException e) {
+	    if (failCnt.incrementAndGet() >= MAX_FAILURES) {
+		/* abort if we are not prepared and not already 
+		 * aborted. If prepared retry, otherwise
+		 * we're done. Check state and make any abort() call
+		 * atomically so we can't accidently abort a prepared
+		 * transaction.
+		 */
+		synchronized (txn) {
+		    switch (txn.getState()) {
+		      case ACTIVE:
+			// Safe to abort, give up
+			if (logger.isLoggable(Level.INFO)) {
+			    logger.log(Level.INFO, "Got RemoteException " +
+			        "when calling getState on " + tr + ", this " +
+                                "was " + failCnt + " RemoteException, " +
+				"dropping active transaction " + tr.id, e);
+			}
+
+			try {
+			    monitor.space().abort(tr.mgr, tr.id);
+			    return true;
+			} catch (UnknownTransactionException ute) {
+			    throw new AssertionError(ute);
+			} catch (UnmarshalException ume) {
+			    throw new AssertionError(ume);
+			}
+		      case PREPARED:
+		        final Level l = (failCnt.get()%MAX_FAILURES == 0)?
+			    Level.INFO:Levels.FAILED;
+			if (logger.isLoggable(l)) {
+			    logger.log(l, "Got RemoteException when calling " +
+				"getState on " + tr + ", this was " + 
+				failCnt + " RemoteException, will keep " +
+				"prepared transaction " + tr.id, e);
+			}
+
+			// Can't give up, keep on trying to find real state
+			mustQuery = true;
+			return false;
+	 	      case ABORTED:
+		      case COMMITTED:
+			// done
+			return true;
+		      default:
+			throw new AssertionError("Txn in unreachable state");
+		    }
+		}
+	    } else {
+		// Don't know, but not ready to give up
+		if (logger.isLoggable(Levels.FAILED)) {
+		    logger.log(Levels.FAILED, "Got RemoteException when " +
+			"calling getState on " + tr + ", failCount = " +
+			failCnt + ", will retry", e);
+		}
+
+		mustQuery = true;      // keep on trying
+		return false;	       // try again next time
+	    }
+	}
+    
+	if (logger.isLoggable(Level.FINER)) {
+	    logger.log(Level.FINER, "{0} trState = {1}", 
+		       new Object[]{this, Integer.valueOf(trState)});
+	}
+
+	failCnt.set(0);		       // reset failures -- we got a response
+
+	/*
+	 * If the two states aren't the same, the state changed and we
+	 * need to account for that locally here by calling the method
+	 * that would make the change (the one we should have gotten.
+	 * (We use the external forms of abort, commit, etc., because
+	 * they are what the manager would call, and therefore these
+	 * calls will always do exactly what the incoming manager
+	 * calls would have done.  I don't want this to be fragile by
+	 * bypassing those calls and going straight to the Txn
+	 * object's calls, which might skip something important in the
+	 * OutriggerServerImpl calls).
+	 */
+
+	if (trState != txnState) {
+	    if (logger.isLoggable(Level.FINER)) {
+		logger.log(Level.FINER, 
+		    "{0} mgr state[{1}] != local state [{2}]", 
+		    new Object[]{this,
+				 TxnConstants.getName(trState),
+				 TxnConstants.getName(txnState)});
+	    }
+
+	    try {
+		switch (trState) {
+		  case ABORTED:
+		    logger.log(Level.FINER, "{0} moving to abort", this);
+			  
+		    monitor.space().abort(tr.mgr, tr.id);
+		    return true;
+
+		  case COMMITTED:
+		    logger.log(Level.FINER, "{0} moving to commit", this);
+
+		    monitor.space().commit(tr.mgr, tr.id);
+		    return true;
+		}
+	    } catch (UnknownTransactionException e) {
+		// we must somehow have already gotten the abort() or
+		// commit(), and have therefore forgotten about the
+		// transaction, while this code was executing
+		return true;
+	    } catch (UnmarshalException ume) {
+		throw new AssertionError(ume);
+	    }
+
+	    // we can't fake anything else -- the manager will have to call
+	    // us
+	}
+
+	logger.log(Level.FINEST, "{0} return false", this);
+
+	return false;			// now we know so nothing more to do
+    }
+
+    /**
+     * Add in a resource.  The lease may already be in, in which case it is
+     * ignored, or it may be null, in which case it was a non-leased probe
+     * that was blocked and we simply set <code>mustQuery</code> to
+     * <code>true</code>.
+     */
+    synchronized void add(QueryWatcher query) {
+	if (query == null || query.getExpiration() <= nextQuery.get()) {
+	    if (logger.isLoggable(Level.FINEST))
+		logger.log(Level.FINEST, "adding resource to task -- SHORT");
+	    mustQuery = true;
+	} else {
+	    if (logger.isLoggable(Level.FINEST))
+		logger.log(Level.FINEST, "adding resource to task -- LONG");
+	    if (queries == null)
+		queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we use it like a WeakHashSet
+	    queries.put(query, null);
+	}
+    }
+
+    /** Log failed unpacking attempt attempt */
+    private void logUnpackingFailure(String exceptionDescription, Level level,
+				     boolean terminal, Throwable t) 
+    {
+	if (logger.isLoggable(level)) {
+	    logger.log(level, "Encountered " + exceptionDescription +
+		"while unpacking exception to check state, " +
+		(terminal?"dropping":"keeping") +  " monitoring task", t);
+	}
+    }
+
+}

Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java Sun Jul  5 11:41:39 2020
@@ -26,6 +26,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
 
 /**
  * A type tree for entries.  It maintains, for each class, a list of