You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@river.apache.org by pe...@apache.org on 2020/07/05 11:41:42 UTC
svn commit: r1879521 [31/37] - in
/river/jtsk/modules/modularize/apache-river: ./ browser/
browser/src/main/java/org/apache/river/example/browser/ extra/
groovy-config/ river-activation/ river-collections/
river-collections/src/main/java/org/apache/riv...
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/Txn.java Sun Jul 5 11:41:39 2020
@@ -1,537 +1,540 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import org.apache.river.constants.TxnConstants;
-import org.apache.river.logging.Levels;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.rmi.RemoteException;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Collections;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.transaction.CannotJoinException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionConstants;
-import net.jini.core.transaction.server.TransactionManager;
-
-import net.jini.space.InternalSpaceException;
-import net.jini.security.ProxyPreparer;
-
-/**
- * This class represents a space's state in a single transaction.
- *
- * Object of this class represent Jini transactions within outrigger.
- * These transactions hold "Transactables" -- things that represent
- * the actions that have been taken under this transaction. For example,
- * if this transaction were to be cancelled, the Transactables are
- * examined and serve as the list of things to roll back in order to
- * restore the state of the Space status quo ante.
- *
- * This is achieved by having the transactables notified of state changes
- * to this transaction such as preparing, commit, etc. The
- * transactables themselves are responsible for "doing the right thing."
- *
- * NB: some--but not all--of the actions one can take with a transaction
- * are managed internally by these objects. That is, all of the important
- * methods objects of these types are synchronized. Therefore, two
- * simultaneous calls to abort() and commit() are arbitrated properly.
- *
- * However, care must be taken when add()ing a transactable. Even
- * though the add() method is synchronized if you check the state of
- * the transaction to ensure that is active and then call add() the
- * transaction could have left the active state between the check and
- * the add() call unless the call obtains the appropriate locks. This
- * is even more likely if other work needs to be done in addition to
- * calling add() (e.g. persisting state, obtaining locks, etc.). The
- * caller of add() should lock the associated transaction object and
- * ensure that the transaction is still considered ACTIVE, do whatever
- * work is necessary to complete while the transaction is in the ACTIVE
- * state (including calling call add()) and then release the lock.
- * This can be done by :
- * <ul>
- * <li> holding the lock on this object while checking the
- * state and carrying out the operation (including calling add()), or
- * <li> calling ensureActive() to check the state
- * and obtain a non-exclusive lock, carrying out the operation
- * (including calling add()), and then calling allowStateChange() to
- * release the lock.
- * </ul>
- * The pair of ensureActive() and allowStateChange() allows for more
- * concurrency if the operation is expected to take a long time, in
- * that it will allow for other operations to be performed under the
- * same transaction and let aborts prevent other operations from
- * being started.
- *
- * @author Sun Microsystems, Inc.
- */
-class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> {
-
- /** The internal id Outrigger as assigned to the transaction */
- final private long id;
-
- /** What state we think the transaction is in */
- private volatile int state;
-
- /**
- * The transaction manager associated with the transaction
- * this object is fronting for.
- */
- private StorableReference trm;
-
- /**
- * Cached <code>ServerTransaction</code> object for
- * the transaction this object is fronting for.
- */
- private ServerTransaction tr;
-
- /** The id the transaction manager assigned to this transaction */
- private volatile long trId;
-
- /**
- * The list of <code>Transactable</code> participating in
- * this transaction.
- */
- final private List<Transactable> txnables = new java.util.LinkedList<Transactable>();
-
- /**
- * The task responsible for monitoring to see if this
- * transaction has been aborted with us being told, or
- * null if no such task as been allocated.
- */
- private volatile TxnMonitorTask monitorTask;
-
- /** Count of number of threads holding a read lock on state */
- private int stateReaders = 0;
-
- /**
- * <code>true</code> if there is a blocked state change. Used
- * to give writers priority.
- */
- private boolean stateChangeWaiting = false;
-
- /** Logger for logging transaction related information */
- private static final Logger logger =
- Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
- /**
- * Create a new <code>Txn</code> that represents our state in the
- * given <code>ServerTransaction</code>.
- */
- Txn(ServerTransaction tr, long id) {
- this(id);
- trId = tr.id;
- this.tr = tr;
- this.trm = new StorableReference(tr.mgr);
- state = ACTIVE;
-
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "creating txn for transaction mgr:" +
- "{0}, id:{1}, state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
- }
-
- /** Used in recovery */
- Txn(long id) {
- this.id = id; // the txn id is not persisted
- }
-
- /**
- * Get the id for this txn. Note that this id is NOT the same as
- * the ID of the transaction. Since that ID is not unique (it must
- * be qualified with the <code>ServerTransaction</code> object) we create
- * our own internal id to make txns unique. This is needed since we
- * may not have the <code>Transaction</code> unmarshalled.
- */
- Long getId() {
- return Long.valueOf(id);
- }
-
- /**
- * We keep the transaction ID around because we may need it
- * to identify a broken transaction after recovery.
- */
- long getTransactionId() {
- return trId;
- }
-
- /**
- * Return our local view of the current state. Need to be holding
- * the lock on this object or have called <code>ensureActive</code>
- * to get the current value.
- */
- int getState() {
- return state;
- }
-
- /**
- * Atomically checks that this transaction is in the active
- * state and locks the transaction in the active state.
- * The lock can be released by calling <code>allowStateChange</code>.
- * Each call to this method should be paired with a call to
- * <code>allowStateChange</code> in a finally block.
- * @throws CannotJoinException if the transaction
- * is not active or a state change is pending.
- */
- synchronized void ensureActive() throws CannotJoinException {
- if (state != ACTIVE || stateChangeWaiting) {
- final String msg = "transaction mgr:" + tr + ", id:" + trId +
- " not active, in state " + TxnConstants.getName(state);
- final CannotJoinException e = new CannotJoinException(msg);
- logger.log(Levels.FAILED, msg, e);
- throw e;
- }
- assert stateReaders >= 0;
- stateReaders++;
- }
-
- /**
- * Release the read lock created by an <code>ensureActive</code>
- * call. Does nothing if the transaction is not active or there is
- * a state change pending and thus is safe to call even if the
- * corresponding <code>ensureActive</code> call threw
- * <code>CannotJoinException</code>.
- */
- synchronized void allowStateChange() {
- if (state != ACTIVE || stateChangeWaiting)
- return;
- stateReaders--;
- assert stateReaders >= 0;
- notifyAll();
- }
-
- /**
- * Prevents new operations from being started under this
- * transaction and blocks until in process operations are
- * completed.
- */
- synchronized void makeInactive() {
- stateChangeWaiting = true;
- assert stateReaders >= 0;
- while (stateReaders != 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new AssertionError(e);
- }
- assert stateReaders >= 0;
- }
- }
-
- /**
- * Prepare for transaction commit. <code>makeInactive</code> must have
- * been called on this transaction first.
- */
- synchronized int prepare(OutriggerServerImpl space) {
- assert stateChangeWaiting : "prepare called before makeInactive";
- assert stateReaders == 0 : "prepare called before makeInactive completed";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
- "state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
-
- switch (state) {
- case ABORTED: // previously aborted
- return ABORTED;
-
- case COMMITTED: // previously committed
- throw new IllegalStateException(); // "cannot happen"
-
- case NOTCHANGED: // previously voted NOTCHANGED
- case PREPARED: // previously voted PREPARED
- return state; // they are idempotent, and
- // and we have nothing to do
- // so return
-
- case ACTIVE: // currently active
- boolean changed = false; // did this txn change
- // anything?
-
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "prepare:preparing transaction mgr:" +
- "{0}, id:{1}, state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
-
- // loop through Transactable members of this Txn
- final Iterator<Transactable> i = txnables.iterator();
- int c=0; // Counter for debugging message
- while (i.hasNext()) {
- // get this member's vote
- final Transactable transactable = i.next();
- final int prepState = transactable.prepare(this, space);
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "prepare:prepared " +
- "transactable {0} for transaction mgr:{1}, id:{2}," +
- " transactable now in state {3}",
- new Object[]{transactable, tr, Long.valueOf(trId),
- TxnConstants.getName(prepState)});
- }
-
- switch (prepState) {
- case PREPARED: // has prepared state
- changed = true; // this means a change
- continue;
-
- case ABORTED: // has to abort
- abort(space); // abort this txn (does cleanup)
- state = ABORTED;
- return state; // vote aborted
-
- case NOTCHANGED: // no change
- i.remove(); // Won't need to call again
- continue;
-
- default: // huh?
- throw new
- InternalSpaceException("prepare said " + prepState);
- }
- }
-
- if (changed) {
- state = PREPARED;
- // have to watch this since it's now holding permanent
- // resources
- space.monitor(Collections.nCopies(1, this));
- } else {
- state = NOTCHANGED;
- }
- break;
-
- default:
- throw new IllegalStateException("unknown Txn state: " + state);
- }
-
- return state;
- }
-
- /**
- * Abort the transaction. This must be callable from
- * <code>prepare</code> because if a <code>Transactable</code>
- * votes <code>ABORT</code>, this method is called to make that
- * happen. <code>makeInactive</code> must have been called on this
- * transaction first.
- */
- synchronized void abort(OutriggerServerImpl space) {
- assert stateChangeWaiting : "abort called before makeInactive";
- assert stateReaders == 0 : "abort called before makeInactive completed";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
- "state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
-
- switch (state) {
- case ABORTED: // already aborted
- case NOTCHANGED: // nothing to abort
- break;
-
- case COMMITTED: // "cannot happen"
- throw new IllegalStateException("aborting a committed txn");
-
- case ACTIVE:
- case PREPARED:
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "abort:aborting transaction mgr:" +
- "{0}, id:{1}, state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
-
- final Iterator<Transactable> i = txnables.iterator();
- while (i.hasNext()) {
- i.next().abort(this, space);
- }
- state = ABORTED;
- cleanup();
- break;
-
- default:
- throw new IllegalStateException("unknown Txn state: " + state);
- }
- }
-
- /**
- * Having prepared, roll the changes
- * forward. <code>makeInactive</code> must have been called on
- * this transaction first.
- */
- synchronized void commit(OutriggerServerImpl space) {
- assert stateChangeWaiting : "commit called before makeInactive";
- assert stateReaders == 0 : "commit called before makeInactive completed";
-
- //!! Need to involve mgr here
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
- "state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
-
- switch (state) {
- case ABORTED: // "cannot happen" stuff
- case ACTIVE:
- case NOTCHANGED:
- throw new IllegalStateException("committing "
- + TxnConstants.getName(state) + " txn");
-
- case COMMITTED: // previous committed, that's okay
- return;
-
- case PREPARED: // voted PREPARED, time to finish up
- final Iterator<Transactable> i = txnables.iterator();
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "commit:committing transaction mgr:" +
- "{0}, id:{1}, state:{2}",
- new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
- }
-
- while (i.hasNext()) {
- i.next().commit(this, space);
- }
- state = COMMITTED;
- cleanup();
- return;
-
- default:
- throw new IllegalStateException("unknown Txn state: " + state);
- }
- }
-
- /**
- * Caution: see locking discussion at the class level.
- */
- public synchronized Transactable add(Transactable t) {
- txnables.add(t);
- return t;
- }
-
- // inherit doc comment
- public ServerTransaction getTransaction(ProxyPreparer preparer)
- throws IOException, ClassNotFoundException
- {
- synchronized (this){
- if (tr == null) {
- final TransactionManager mgr =
- (TransactionManager)trm.get(preparer);
- tr = new ServerTransaction(mgr, trId);
- }
- return tr;
- }
- }
-
- /**
- * Return the manager associated with this transaction.
- * @return the manager associated with this transaction.
- * @throws IllegalStateException if this <code>Txn</code>
- * is still broken.
- */
- synchronized TransactionManager getManager() {
- if (tr == null)
- throw new IllegalStateException("Txn is still broken");
- return tr.mgr;
- }
-
- /**
- * Return the monitor task for this object. Note, this
- * method is unsynchronized because it (and
- * <code>monitorTask(TxnMonitorTask)</code> are both called
- * from the same thread.
- */
- TxnMonitorTask monitorTask() {
- return monitorTask;
- }
-
- /**
- * Set the monitor task for this object. Note, this method is
- * unsynchronized because it (and <code>monitorTask()</code> are
- * both called from the same thread.
- */
- void monitorTask(TxnMonitorTask task) {
- monitorTask = task;
- }
-
- /**
- * Clean up any state when the transaction is finished.
- */
- private void cleanup() {
- if (monitorTask != null)
- monitorTask.cancel(false); }
-
- // -----------------------------------
- // Methods required by StorableObject
- // -----------------------------------
-
- // inherit doc comment
- public void store(ObjectOutputStream out) throws IOException {
- /* There is a bunch of stuff we don't need to write. The
- * Txn id not stored since it is handed back during
- * recovery. The content is rebuilt txnables by the various
- * recoverWrite and recoverTake calls. state is not written
- * because it is always ACTIVE when we write, and always
- * needs to be PREPARED when we read it back.
- */
- synchronized (this){
- out.writeObject(trm);
- out.writeLong(trId);
- }
- }
-
- // inherit doc comment
- public Txn restore(ObjectInputStream in)
- throws IOException, ClassNotFoundException
- {
- /* Only transactions that got prepared and not committed or
- * aborted get recovered
- */
- synchronized (this){
- state = PREPARED;
- trm = (StorableReference)in.readObject();
- trId = in.readLong();
- }
- return this;
- }
-
- @Override
- public int compareTo(Txn o) {
- if (o == null) return -1;
- if (o.id < id) return -1;
- if (o.id > id) return 1;
- return 0;
- }
-
- @Override
- public int hashCode() {
- int hash = 7;
- hash = 31 * hash + (int) (this.id ^ (this.id >>> 32));
- return hash;
- }
-
- public boolean equals(Object o){
- if (!(o instanceof Txn)) return false;
- Txn txn = (Txn) o;
- return id == txn.id;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import org.apache.river.constants.TxnConstants;
+import org.apache.river.logging.Levels;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.rmi.RemoteException;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Collections;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.transaction.CannotJoinException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+import net.jini.core.transaction.server.TransactionManager;
+
+import net.jini.space.InternalSpaceException;
+import net.jini.security.ProxyPreparer;
+import org.apache.river.outrigger.proxy.StorableObject;
+
+
+
+/**
+ * This class represents a space's state in a single transaction.
+ *
+ * Object of this class represent Jini transactions within outrigger.
+ * These transactions hold "Transactables" -- things that represent
+ * the actions that have been taken under this transaction. For example,
+ * if this transaction were to be cancelled, the Transactables are
+ * examined and serve as the list of things to roll back in order to
+ * restore the state of the Space status quo ante.
+ *
+ * This is achieved by having the transactables notified of state changes
+ * to this transaction such as preparing, commit, etc. The
+ * transactables themselves are responsible for "doing the right thing."
+ *
+ * NB: some--but not all--of the actions one can take with a transaction
+ * are managed internally by these objects. That is, all of the important
+ * methods objects of these types are synchronized. Therefore, two
+ * simultaneous calls to abort() and commit() are arbitrated properly.
+ *
+ * However, care must be taken when add()ing a transactable. Even
+ * though the add() method is synchronized if you check the state of
+ * the transaction to ensure that is active and then call add() the
+ * transaction could have left the active state between the check and
+ * the add() call unless the call obtains the appropriate locks. This
+ * is even more likely if other work needs to be done in addition to
+ * calling add() (e.g. persisting state, obtaining locks, etc.). The
+ * caller of add() should lock the associated transaction object and
+ * ensure that the transaction is still considered ACTIVE, do whatever
+ * work is necessary to complete while the transaction is in the ACTIVE
+ * state (including calling call add()) and then release the lock.
+ * This can be done by :
+ * <ul>
+ * <li> holding the lock on this object while checking the
+ * state and carrying out the operation (including calling add()), or
+ * <li> calling ensureActive() to check the state
+ * and obtain a non-exclusive lock, carrying out the operation
+ * (including calling add()), and then calling allowStateChange() to
+ * release the lock.
+ * </ul>
+ * The pair of ensureActive() and allowStateChange() allows for more
+ * concurrency if the operation is expected to take a long time, in
+ * that it will allow for other operations to be performed under the
+ * same transaction and let aborts prevent other operations from
+ * being started.
+ *
+ * @author Sun Microsystems, Inc.
+ */
+class Txn implements TransactableMgr, TransactionConstants, StorableObject<Txn>, Comparable<Txn> {
+
+ /** The internal id Outrigger as assigned to the transaction */
+ final private long id;
+
+ /** What state we think the transaction is in */
+ private volatile int state;
+
+ /**
+ * The transaction manager associated with the transaction
+ * this object is fronting for.
+ */
+ private StorableReference trm;
+
+ /**
+ * Cached <code>ServerTransaction</code> object for
+ * the transaction this object is fronting for.
+ */
+ private ServerTransaction tr;
+
+ /** The id the transaction manager assigned to this transaction */
+ private volatile long trId;
+
+ /**
+ * The list of <code>Transactable</code> participating in
+ * this transaction.
+ */
+ final private List<Transactable> txnables = new java.util.LinkedList<Transactable>();
+
+ /**
+ * The task responsible for monitoring to see if this
+ * transaction has been aborted with us being told, or
+ * null if no such task as been allocated.
+ */
+ private volatile TxnMonitorTask monitorTask;
+
+ /** Count of number of threads holding a read lock on state */
+ private int stateReaders = 0;
+
+ /**
+ * <code>true</code> if there is a blocked state change. Used
+ * to give writers priority.
+ */
+ private boolean stateChangeWaiting = false;
+
+ /** Logger for logging transaction related information */
+ private static final Logger logger =
+ Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+ /**
+ * Create a new <code>Txn</code> that represents our state in the
+ * given <code>ServerTransaction</code>.
+ */
+ Txn(ServerTransaction tr, long id) {
+ this(id);
+ trId = tr.id;
+ this.tr = tr;
+ this.trm = new StorableReference(tr.mgr);
+ state = ACTIVE;
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.log(Level.FINER, "creating txn for transaction mgr:" +
+ "{0}, id:{1}, state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+ }
+
+ /** Used in recovery */
+ Txn(long id) {
+ this.id = id; // the txn id is not persisted
+ }
+
+ /**
+ * Get the id for this txn. Note that this id is NOT the same as
+ * the ID of the transaction. Since that ID is not unique (it must
+ * be qualified with the <code>ServerTransaction</code> object) we create
+ * our own internal id to make txns unique. This is needed since we
+ * may not have the <code>Transaction</code> unmarshalled.
+ */
+ Long getId() {
+ return Long.valueOf(id);
+ }
+
+ /**
+ * We keep the transaction ID around because we may need it
+ * to identify a broken transaction after recovery.
+ */
+ long getTransactionId() {
+ return trId;
+ }
+
+ /**
+ * Return our local view of the current state. Need to be holding
+ * the lock on this object or have called <code>ensureActive</code>
+ * to get the current value.
+ */
+ int getState() {
+ return state;
+ }
+
+ /**
+ * Atomically checks that this transaction is in the active
+ * state and locks the transaction in the active state.
+ * The lock can be released by calling <code>allowStateChange</code>.
+ * Each call to this method should be paired with a call to
+ * <code>allowStateChange</code> in a finally block.
+ * @throws CannotJoinException if the transaction
+ * is not active or a state change is pending.
+ */
+ synchronized void ensureActive() throws CannotJoinException {
+ if (state != ACTIVE || stateChangeWaiting) {
+ final String msg = "transaction mgr:" + tr + ", id:" + trId +
+ " not active, in state " + TxnConstants.getName(state);
+ final CannotJoinException e = new CannotJoinException(msg);
+ logger.log(Levels.FAILED, msg, e);
+ throw e;
+ }
+ assert stateReaders >= 0;
+ stateReaders++;
+ }
+
+ /**
+ * Release the read lock created by an <code>ensureActive</code>
+ * call. Does nothing if the transaction is not active or there is
+ * a state change pending and thus is safe to call even if the
+ * corresponding <code>ensureActive</code> call threw
+ * <code>CannotJoinException</code>.
+ */
+ synchronized void allowStateChange() {
+ if (state != ACTIVE || stateChangeWaiting)
+ return;
+ stateReaders--;
+ assert stateReaders >= 0;
+ notifyAll();
+ }
+
+ /**
+ * Prevents new operations from being started under this
+ * transaction and blocks until in process operations are
+ * completed.
+ */
+ synchronized void makeInactive() {
+ stateChangeWaiting = true;
+ assert stateReaders >= 0;
+ while (stateReaders != 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ assert stateReaders >= 0;
+ }
+ }
+
+ /**
+ * Prepare for transaction commit. <code>makeInactive</code> must have
+ * been called on this transaction first.
+ */
+ synchronized int prepare(OutriggerServerImpl space) {
+ assert stateChangeWaiting : "prepare called before makeInactive";
+ assert stateReaders == 0 : "prepare called before makeInactive completed";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.log(Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
+ "state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+
+ switch (state) {
+ case ABORTED: // previously aborted
+ return ABORTED;
+
+ case COMMITTED: // previously committed
+ throw new IllegalStateException(); // "cannot happen"
+
+ case NOTCHANGED: // previously voted NOTCHANGED
+ case PREPARED: // previously voted PREPARED
+ return state; // they are idempotent, and
+ // and we have nothing to do
+ // so return
+
+ case ACTIVE: // currently active
+ boolean changed = false; // did this txn change
+ // anything?
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "prepare:preparing transaction mgr:" +
+ "{0}, id:{1}, state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+
+ // loop through Transactable members of this Txn
+ final Iterator<Transactable> i = txnables.iterator();
+ int c=0; // Counter for debugging message
+ while (i.hasNext()) {
+ // get this member's vote
+ final Transactable transactable = i.next();
+ final int prepState = transactable.prepare(this, space);
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "prepare:prepared " +
+ "transactable {0} for transaction mgr:{1}, id:{2}," +
+ " transactable now in state {3}",
+ new Object[]{transactable, tr, Long.valueOf(trId),
+ TxnConstants.getName(prepState)});
+ }
+
+ switch (prepState) {
+ case PREPARED: // has prepared state
+ changed = true; // this means a change
+ continue;
+
+ case ABORTED: // has to abort
+ abort(space); // abort this txn (does cleanup)
+ state = ABORTED;
+ return state; // vote aborted
+
+ case NOTCHANGED: // no change
+ i.remove(); // Won't need to call again
+ continue;
+
+ default: // huh?
+ throw new
+ InternalSpaceException("prepare said " + prepState);
+ }
+ }
+
+ if (changed) {
+ state = PREPARED;
+ // have to watch this since it's now holding permanent
+ // resources
+ space.monitor(Collections.nCopies(1, this));
+ } else {
+ state = NOTCHANGED;
+ }
+ break;
+
+ default:
+ throw new IllegalStateException("unknown Txn state: " + state);
+ }
+
+ return state;
+ }
+
+ /**
+ * Abort the transaction. This must be callable from
+ * <code>prepare</code> because if a <code>Transactable</code>
+ * votes <code>ABORT</code>, this method is called to make that
+ * happen. <code>makeInactive</code> must have been called on this
+ * transaction first.
+ */
+ synchronized void abort(OutriggerServerImpl space) {
+ assert stateChangeWaiting : "abort called before makeInactive";
+ assert stateReaders == 0 : "abort called before makeInactive completed";
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.log(Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
+ "state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+
+ switch (state) {
+ case ABORTED: // already aborted
+ case NOTCHANGED: // nothing to abort
+ break;
+
+ case COMMITTED: // "cannot happen"
+ throw new IllegalStateException("aborting a committed txn");
+
+ case ACTIVE:
+ case PREPARED:
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "abort:aborting transaction mgr:" +
+ "{0}, id:{1}, state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+
+ final Iterator<Transactable> i = txnables.iterator();
+ while (i.hasNext()) {
+ i.next().abort(this, space);
+ }
+ state = ABORTED;
+ cleanup();
+ break;
+
+ default:
+ throw new IllegalStateException("unknown Txn state: " + state);
+ }
+ }
+
+ /**
+ * Having prepared, roll the changes
+ * forward. <code>makeInactive</code> must have been called on
+ * this transaction first.
+ */
+ synchronized void commit(OutriggerServerImpl space) {
+ assert stateChangeWaiting : "commit called before makeInactive";
+ assert stateReaders == 0 : "commit called before makeInactive completed";
+
+ //!! Need to involve mgr here
+ if (logger.isLoggable(Level.FINER)) {
+ logger.log(Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
+ "state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+
+ switch (state) {
+ case ABORTED: // "cannot happen" stuff
+ case ACTIVE:
+ case NOTCHANGED:
+ throw new IllegalStateException("committing "
+ + TxnConstants.getName(state) + " txn");
+
+ case COMMITTED: // previous committed, that's okay
+ return;
+
+ case PREPARED: // voted PREPARED, time to finish up
+ final Iterator<Transactable> i = txnables.iterator();
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "commit:committing transaction mgr:" +
+ "{0}, id:{1}, state:{2}",
+ new Object[]{tr, Long.valueOf(trId), TxnConstants.getName(state)});
+ }
+
+ while (i.hasNext()) {
+ i.next().commit(this, space);
+ }
+ state = COMMITTED;
+ cleanup();
+ return;
+
+ default:
+ throw new IllegalStateException("unknown Txn state: " + state);
+ }
+ }
+
+ /**
+ * Caution: see locking discussion at the class level.
+ */
+ public synchronized Transactable add(Transactable t) {
+ txnables.add(t);
+ return t;
+ }
+
+ // inherit doc comment
+ public ServerTransaction getTransaction(ProxyPreparer preparer)
+ throws IOException, ClassNotFoundException
+ {
+ synchronized (this){
+ if (tr == null) {
+ final TransactionManager mgr =
+ (TransactionManager)trm.get(preparer);
+ tr = new ServerTransaction(mgr, trId);
+ }
+ return tr;
+ }
+ }
+
+ /**
+ * Return the manager associated with this transaction.
+ * @return the manager associated with this transaction.
+ * @throws IllegalStateException if this <code>Txn</code>
+ * is still broken.
+ */
+ synchronized TransactionManager getManager() {
+ if (tr == null)
+ throw new IllegalStateException("Txn is still broken");
+ return tr.mgr;
+ }
+
+ /**
+ * Return the monitor task for this object. Note, this
+ * method is unsynchronized because it (and
+ * <code>monitorTask(TxnMonitorTask)</code> are both called
+ * from the same thread.
+ */
+ TxnMonitorTask monitorTask() {
+ return monitorTask;
+ }
+
+ /**
+ * Set the monitor task for this object. Note, this method is
+ * unsynchronized because it (and <code>monitorTask()</code> are
+ * both called from the same thread.
+ */
+ void monitorTask(TxnMonitorTask task) {
+ monitorTask = task;
+ }
+
+ /**
+ * Clean up any state when the transaction is finished.
+ */
+ private void cleanup() {
+ if (monitorTask != null)
+ monitorTask.cancel(false); }
+
+ // -----------------------------------
+ // Methods required by StorableObject
+ // -----------------------------------
+
+ // inherit doc comment
+ public void store(ObjectOutputStream out) throws IOException {
+ /* There is a bunch of stuff we don't need to write. The
+ * Txn id not stored since it is handed back during
+ * recovery. The content is rebuilt txnables by the various
+ * recoverWrite and recoverTake calls. state is not written
+ * because it is always ACTIVE when we write, and always
+ * needs to be PREPARED when we read it back.
+ */
+ synchronized (this){
+ out.writeObject(trm);
+ out.writeLong(trId);
+ }
+ }
+
+ // inherit doc comment
+ public Txn restore(ObjectInputStream in)
+ throws IOException, ClassNotFoundException
+ {
+ /* Only transactions that got prepared and not committed or
+ * aborted get recovered
+ */
+ synchronized (this){
+ state = PREPARED;
+ trm = (StorableReference)in.readObject();
+ trId = in.readLong();
+ }
+ return this;
+ }
+
+ @Override
+ public int compareTo(Txn o) {
+ if (o == null) return -1;
+ if (o.id < id) return -1;
+ if (o.id > id) return 1;
+ return 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 31 * hash + (int) (this.id ^ (this.id >>> 32));
+ return hash;
+ }
+
+ public boolean equals(Object o){
+ if (!(o instanceof Txn)) return false;
+ Txn txn = (Txn) o;
+ return id == txn.id;
+ }
+}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitor.java Sun Jul 5 11:41:39 2020
@@ -18,7 +18,7 @@
package org.apache.river.outrigger;
import org.apache.river.config.Config;
-import org.apache.river.thread.WakeupManager;
+import org.apache.river.thread.wakeup.WakeupManager;
import net.jini.config.Configuration;
import net.jini.config.ConfigurationException;
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TxnMonitorTask.java Sun Jul 5 11:41:39 2020
@@ -1,584 +1,584 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.river.outrigger;
-
-import org.apache.river.constants.TxnConstants;
-import org.apache.river.constants.ThrowableConstants;
-import org.apache.river.logging.Levels;
-import org.apache.river.thread.RetryTask;
-import org.apache.river.thread.WakeupManager;
-
-import java.io.IOException;
-import java.rmi.RemoteException;
-import java.rmi.UnmarshalException;
-import java.rmi.NoSuchObjectException;
-import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.Iterator;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import net.jini.core.transaction.TransactionException;
-import net.jini.core.transaction.UnknownTransactionException;
-import net.jini.core.transaction.server.ServerTransaction;
-import net.jini.core.transaction.server.TransactionConstants;
-
-/**
- * A task that will try to validate the state of a transaction. This
- * uses weak references a good deal to let the other parts of the system
- * be GC'ed as necessary.
- * <p>
- * The retry mechanism is subtle, so bear with me. The purpose is
- * to ensure that if any activity is being blocked by a given
- * transaction, that transaction will be tested at some point in
- * the future (if necessary, i.e., if it still is thought to be
- * active). We assume it to be rare that a transactions that the
- * space thinks is active is, in fact, aborted, so the algorithm is
- * designed to guarantee the detection without a lot of overhead,
- * specifically without a lot of RMI calls.
- * <p>
- * Each task has three values: a <code>nextQuery</code> time, a
- * <code>mustQuery</code> boolean that force the next query to be
- * made, and <code>deltaT</code>, the time at which the following
- * query will be scheduled. When the task is awakened at its
- * <code>nextQuery</code> time, it checks to see if it must make an
- * actual query to the transaction manager, which it will do if either
- * <code>mustQuery</code> is <code>true</code>, or if we know about
- * any in progress queries on the space that are blocked on the
- * transaction. Whether or not an actual query is made,
- * <code>deltaT</code> is added to <code>nextQuery</code> to get the
- * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
- * <code>mustQuery</code> boolean is set to <code>false</code>.
- * <p>
- * There are two kinds of requests that a with which transaction
- * can cause a conflict -- those with long timeouts (such as
- * blocking reads and takes) and those that are under short timeouts
- * (such as reads and takes with zero-length timeouts). We will
- * treat them separately at several points of the algorithm. A
- * short timeout is any query whose expiration time is sooner than
- * the <code>nextQuery</code> time. Any other timeout is long
- * If a short query arrives, <code>mustQuery</code> is set to
- * <code>true</code>.
- * <p>
- * The result is that any time a transaction causes a conflict, if
- * the query on the space has not ended by the time of the
- * <code>nextQuery</code> we will attempt to poll the transaction manager.
- * There will also poll the transaction manager if any conflict occurred
- * on a query on the space with a short timeout.
- * <p>
- * The first time a transaction causes a conflict, we schedule a
- * time in the future at which we will poll its status. We do not
- * poll right away because often a transaction will complete on
- * its own before we get to that time, making the check
- * unnecessary. An instant poll is, therefore, unnecessarily
- * aggressive, since giving an initial grace time will usually mean
- * no poll is made at all. So if the first conflict occurs at
- * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
- * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
- * will be <code>true</code> to force that poll to happen, and
- * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
- *
- * @author Sun Microsystems, Inc.
- *
- * @see TxnMonitor
- */
-class TxnMonitorTask extends RetryTask
- implements TransactionConstants, org.apache.river.constants.TimeConstants
-{
- /** transaction being monitored */
- private final Txn txn;
-
- /** the monitor we were made by */
- private final TxnMonitor monitor;
-
- /**
- * All the queries on the space (not queries to the transaction
- * manager) waiting for <code>txn</code> to be resolved.
- * <code>null</code> until we have at least one. Represented by
- * <code>QueryWatcher</code> objects.
- */
- private Map<QueryWatcher,Collection<Txn>> queries; //Sync on this.
-
- /** count of RemoteExceptions */
- private final AtomicInteger failCnt;
-
- /**
- * The next time we need to poll the transaction manager
- * to get <code>txn</code>'s actual state.
- */
- private final AtomicLong nextQuery;
-
- /**
- * When we're given an opportunity to poll the transaction manager
- * for the <code>txn</code>'s state, do so.
- */
- private volatile boolean mustQuery;
-
- /** next value added to <code>nextQuery</code> */
- private volatile long deltaT;
-
- /**
- * The initial grace period before the first query.
- */
- private static final long INITIAL_GRACE = 15 * SECONDS;
-
- /**
- * The retry time when we have an encountered an exception
- */
- private static final long BETWEEN_EXCEPTIONS = 15 * SECONDS;
-
- /**
- * The largest value that <code>deltaT</code> will reach.
- */
- private static final long MAX_DELTA_T = 1 * HOURS;
-
- /**
- * The maximum number of failures allowed in a row before we simply
- * give up on the transaction and consider it aborted.
- */
- private static final int MAX_FAILURES = 3;
-
- /** Logger for logging transaction related information */
- private static final Logger logger =
- Logger.getLogger(OutriggerServerImpl.txnLoggerName);
-
- /**
- * Create a new TxnMonitorTask.
- */
- TxnMonitorTask(Txn txn, TxnMonitor monitor,
- ExecutorService manager, WakeupManager wakeupMgr) {
- super(manager, wakeupMgr);
- this.txn = txn;
- this.monitor = monitor;
- nextQuery = new AtomicLong(startTime()); // retryTime will add INITIAL_GRACE
- deltaT = INITIAL_GRACE;
- mustQuery = true;
- failCnt = new AtomicInteger();
- }
-
- /**
- * Return the time of the next query, bumping <code>deltaT</code> as
- * necessary for the next iteration. If the transaction has voted
- * <code>PREPARED</code> or the manager has been giving us a
- * <code>RemoteException</code>, we should retry on short times;
- * otherwise we back off quickly.
- */
- public long retryTime() {
- boolean noFailures = false;
- synchronized (txn){
- noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED);
- }
- if (noFailures) { // no failures
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "{0} retryTime adds {1}",
- new Object[]{this, Long.valueOf(deltaT)});
- }
-
- nextQuery.addAndGet(deltaT);
- synchronized (this){
- if (deltaT < MAX_DELTA_T)
- deltaT = Math.min(deltaT * 2, MAX_DELTA_T);
- }
- } else {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "{0} retryTime adds {1} (for {2})",
- new Object[]{this, Long.valueOf(BETWEEN_EXCEPTIONS),
- (failCnt.get() != 0 ? "failure" : "PREPARED")});
- }
- nextQuery.addAndGet(BETWEEN_EXCEPTIONS);
- }
-
- return nextQuery.get();
- }
-
- /**
- * Add a ``sibling'' transaction, one that is now blocking progress
- * on one of the same entries. For example, if a client is blocked
- * on a <code>read</code>, another transaction can read the same
- * entry, thereby also blocking that same client. This means that
- * the transaction for the second <code>read</code> must be
- * watched, too. The list of queries for the second transaction
- * might be less that the list of those in this transaction, but
- * the process of figuring out the subset is too expensive, since
- * we have tried to make the checking process itself cheap,
- * anyway. So we add all queries this task is currently monitoring
- * to the task monitoring the second transaction. If there are
- * no queries, then the blocking occurred because of a short query
- * or all the queries have expired, in which case the second transaction
- * isn't blocking the way of anything currently, so this method does
- * nothing.
- * <p>
- * Of course, in order to avoid blocking the thread that is calling
- * this (which is trying to perform a <code>read</code>, after
- * all), we simply add each lease in this task to the monitor's
- * queue.
- *
- */
- // @see TxnEntryHandle#monitor
- //!! Would it be worth the overhead to make TxnEntryHandle.monitor
- //!! search for the transaction with the smallest set of leases? -arnold
- synchronized void addSibling(Txn txn) {
- if (queries == null || queries.size() == 0)
- return;
- Collection<Txn> sibling = Collections.nCopies(1, txn);
- Iterator<QueryWatcher> it = queries.keySet().iterator();
- while (it.hasNext()) {
- QueryWatcher query = it.next();
- if (query != null) // from a weak map, so might be null
- monitor.add(query, sibling);
- }
- }
-
- /**
- * Try to see if this transaction should be aborted. This returns
- * <code>true</code> (don't repeat the task) if it knows that the
- * transaction is no longer interesting to anyone.
- */
- public boolean tryOnce() {
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}",
- new Object[]{this, Integer.valueOf(attempt()),
- Boolean.valueOf(mustQuery) });
- }
-
- /*
- * The first time we do nothing, since RetryTask invokes run first,
- * but we want to wait a bit before testing the transaction.
- */
- if (attempt() == 0)
- return false;
- int txnState;
- synchronized (txn){
- txnState = txn.getState();
- }
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "{0} txn.getState() = {1}",
- new Object[]{this, Integer.valueOf(txnState)});
- }
-
- // not active or prepared == no longer blocking
-
- if (txnState != ACTIVE && txnState != PREPARED)
- return true;
-
- // if we're prepared, test every time -- this shouldn't take long
- mustQuery |= (txnState == PREPARED);
-
- /*
- * Go through the resources to see if we can find one still active
- * that cares. Must be synchronized since we test, then clear --
- * another thread that set the flag between the test and clear
- * would have its requirements lost.
- */
- synchronized (this) {
- if (!mustQuery) { // then try resources
- if (queries == null) // no resources, so nobody wants it
- return false; // try again next time
-
- Iterator<QueryWatcher> it = queries.keySet().iterator();
- boolean foundNeed = false;
-
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "{0} nextQuery {1}",
- new Object[]{this, nextQuery});
- }
-
- while (it.hasNext()) {
- QueryWatcher query = it.next();
- if (query == null) // gone -- the map will reap it
- continue;
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST,
- "{0} query.getExpiration() {1}",
- new Object[]{this,
- Long.valueOf(query.getExpiration())});
- }
-
- if (query.getExpiration() < nextQuery.get() || query.isResolved()) {
- it.remove();
- } // expired, so we don't care about it
- else {
- foundNeed = true;
- break;
- }
- }
-
- if (logger.isLoggable(Level.FINEST)) {
- logger.log(Level.FINEST, "{0} foundNeed = {1}",
- new Object[]{this, Boolean.valueOf(foundNeed)});
- }
-
- if (!foundNeed) // nobody wants it
- return false; // try again next time
- }
- mustQuery = false; // clear it for next time
- }
-
- /*
- * Now we know (a) the transaction itself is alive, and (b) some
- * lease still cares. Make sure it's still active as far as the
- * it knows, and if it is, then ask the manager about it.
- */
- ServerTransaction tr;
- try {
- /* This may fix a broken Txn, if it does it won't get moved
- * from the broken to the unbroken list. It will get
- * moved eventually, but it does seem unfortunate it does
- * not happen immediately
- */
- tr = txn.getTransaction(
- monitor.space().getRecoveredTransactionManagerPreparer());
- } catch (RemoteException e) {
- final int cat = ThrowableConstants.retryable(e);
-
- if (cat == ThrowableConstants.BAD_INVOCATION ||
- cat == ThrowableConstants.BAD_OBJECT)
- {
- // Not likely to get better, give up
- logUnpackingFailure("definite exception", Level.INFO,
- true, e);
- return true;
- } else if (cat == ThrowableConstants.INDEFINITE) {
- // try, try, again
- logUnpackingFailure("indefinite exception", Levels.FAILED,
- false, e);
- mustQuery = true;
- return false;
- } else if (cat == ThrowableConstants.UNCATEGORIZED) {
- // Same as above but log differently.
- mustQuery = true;
- logUnpackingFailure("uncategorized exception", Level.INFO,
- false, e);
- return false;
- } else {
- logger.log(Level.WARNING, "ThrowableConstants.retryable " +
- "returned out of range value, " + cat,
- new AssertionError(e));
- return false;
- }
- } catch (IOException e) {
- // Not likely to get better
- logUnpackingFailure("IOException", Level.INFO, true, e);
- return true;
- } catch (RuntimeException e) {
- // Not likely to get better
- logUnpackingFailure("RuntimeException", Level.INFO, true, e);
- return true;
- } catch (ClassNotFoundException e) {
- // codebase probably down, keep trying
- logUnpackingFailure("ClassNotFoundException", Levels.FAILED,
- false, e);
- mustQuery = true;
- return false;
- }
-
- if (logger.isLoggable(Level.FINEST))
- logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr});
-
- int trState;
- try {
- trState = tr.getState();
- } catch (TransactionException e) {
- if (logger.isLoggable(Level.INFO))
- logger.log(Level.INFO, "Got TransactionException when " +
- "calling getState on " + tr + ", dropping transaction " +
- tr.id, e);
- trState = ABORTED;
- } catch (NoSuchObjectException e) {
- /* It would be epsilon better to to give up immediately
- * if we get a NoSuchObjectException and we are in the
- * active state, however, the code to do this would
- * be very complicated since we need to hold a lock to
- * while reading and acting on the state.
- */
- if (failCnt.incrementAndGet() >= MAX_FAILURES) {
- if (logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, "Got NoSuchObjectException when " +
- "calling getState on " + tr + ", this was the " +
- failCnt + " RemoteException, dropping transaction" +
- tr.id, e);
- }
- trState = ABORTED;
- } else {
- if (logger.isLoggable(Levels.FAILED)) {
- logger.log(Levels.FAILED, "Got NoSuchObjectException " +
- "when calling getState on " + tr + ", failCount = " +
- failCnt + ", will retry", e);
- }
- mustQuery = true; // keep on trying
- return false; // try again next time
- }
- } catch (RemoteException e) {
- if (failCnt.incrementAndGet() >= MAX_FAILURES) {
- /* abort if we are not prepared and not already
- * aborted. If prepared retry, otherwise
- * we're done. Check state and make any abort() call
- * atomically so we can't accidently abort a prepared
- * transaction.
- */
- synchronized (txn) {
- switch (txn.getState()) {
- case ACTIVE:
- // Safe to abort, give up
- if (logger.isLoggable(Level.INFO)) {
- logger.log(Level.INFO, "Got RemoteException " +
- "when calling getState on " + tr + ", this " +
- "was " + failCnt + " RemoteException, " +
- "dropping active transaction " + tr.id, e);
- }
-
- try {
- monitor.space().abort(tr.mgr, tr.id);
- return true;
- } catch (UnknownTransactionException ute) {
- throw new AssertionError(ute);
- } catch (UnmarshalException ume) {
- throw new AssertionError(ume);
- }
- case PREPARED:
- final Level l = (failCnt.get()%MAX_FAILURES == 0)?
- Level.INFO:Levels.FAILED;
- if (logger.isLoggable(l)) {
- logger.log(l, "Got RemoteException when calling " +
- "getState on " + tr + ", this was " +
- failCnt + " RemoteException, will keep " +
- "prepared transaction " + tr.id, e);
- }
-
- // Can't give up, keep on trying to find real state
- mustQuery = true;
- return false;
- case ABORTED:
- case COMMITTED:
- // done
- return true;
- default:
- throw new AssertionError("Txn in unreachable state");
- }
- }
- } else {
- // Don't know, but not ready to give up
- if (logger.isLoggable(Levels.FAILED)) {
- logger.log(Levels.FAILED, "Got RemoteException when " +
- "calling getState on " + tr + ", failCount = " +
- failCnt + ", will retry", e);
- }
-
- mustQuery = true; // keep on trying
- return false; // try again next time
- }
- }
-
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "{0} trState = {1}",
- new Object[]{this, Integer.valueOf(trState)});
- }
-
- failCnt.set(0); // reset failures -- we got a response
-
- /*
- * If the two states aren't the same, the state changed and we
- * need to account for that locally here by calling the method
- * that would make the change (the one we should have gotten.
- * (We use the external forms of abort, commit, etc., because
- * they are what the manager would call, and therefore these
- * calls will always do exactly what the incoming manager
- * calls would have done. I don't want this to be fragile by
- * bypassing those calls and going straight to the Txn
- * object's calls, which might skip something important in the
- * OutriggerServerImpl calls).
- */
-
- if (trState != txnState) {
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER,
- "{0} mgr state[{1}] != local state [{2}]",
- new Object[]{this,
- TxnConstants.getName(trState),
- TxnConstants.getName(txnState)});
- }
-
- try {
- switch (trState) {
- case ABORTED:
- logger.log(Level.FINER, "{0} moving to abort", this);
-
- monitor.space().abort(tr.mgr, tr.id);
- return true;
-
- case COMMITTED:
- logger.log(Level.FINER, "{0} moving to commit", this);
-
- monitor.space().commit(tr.mgr, tr.id);
- return true;
- }
- } catch (UnknownTransactionException e) {
- // we must somehow have already gotten the abort() or
- // commit(), and have therefore forgotten about the
- // transaction, while this code was executing
- return true;
- } catch (UnmarshalException ume) {
- throw new AssertionError(ume);
- }
-
- // we can't fake anything else -- the manager will have to call
- // us
- }
-
- logger.log(Level.FINEST, "{0} return false", this);
-
- return false; // now we know so nothing more to do
- }
-
- /**
- * Add in a resource. The lease may already be in, in which case it is
- * ignored, or it may be null, in which case it was a non-leased probe
- * that was blocked and we simply set <code>mustQuery</code> to
- * <code>true</code>.
- */
- synchronized void add(QueryWatcher query) {
- if (query == null || query.getExpiration() <= nextQuery.get()) {
- if (logger.isLoggable(Level.FINEST))
- logger.log(Level.FINEST, "adding resource to task -- SHORT");
- mustQuery = true;
- } else {
- if (logger.isLoggable(Level.FINEST))
- logger.log(Level.FINEST, "adding resource to task -- LONG");
- if (queries == null)
- queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we use it like a WeakHashSet
- queries.put(query, null);
- }
- }
-
- /** Log failed unpacking attempt attempt */
- private void logUnpackingFailure(String exceptionDescription, Level level,
- boolean terminal, Throwable t)
- {
- if (logger.isLoggable(level)) {
- logger.log(level, "Encountered " + exceptionDescription +
- "while unpacking exception to check state, " +
- (terminal?"dropping":"keeping") + " monitoring task", t);
- }
- }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.river.outrigger;
+
+import org.apache.river.constants.TxnConstants;
+import org.apache.river.constants.ThrowableConstants;
+import org.apache.river.logging.Levels;
+import org.apache.river.thread.wakeup.RetryTask;
+import org.apache.river.thread.wakeup.WakeupManager;
+
+import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.UnmarshalException;
+import java.rmi.NoSuchObjectException;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import net.jini.core.transaction.TransactionException;
+import net.jini.core.transaction.UnknownTransactionException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+
+/**
+ * A task that will try to validate the state of a transaction. This
+ * uses weak references a good deal to let the other parts of the system
+ * be GC'ed as necessary.
+ * <p>
+ * The retry mechanism is subtle, so bear with me. The purpose is
+ * to ensure that if any activity is being blocked by a given
+ * transaction, that transaction will be tested at some point in
+ * the future (if necessary, i.e., if it still is thought to be
+ * active). We assume it to be rare that a transactions that the
+ * space thinks is active is, in fact, aborted, so the algorithm is
+ * designed to guarantee the detection without a lot of overhead,
+ * specifically without a lot of RMI calls.
+ * <p>
+ * Each task has three values: a <code>nextQuery</code> time, a
+ * <code>mustQuery</code> boolean that force the next query to be
+ * made, and <code>deltaT</code>, the time at which the following
+ * query will be scheduled. When the task is awakened at its
+ * <code>nextQuery</code> time, it checks to see if it must make an
+ * actual query to the transaction manager, which it will do if either
+ * <code>mustQuery</code> is <code>true</code>, or if we know about
+ * any in progress queries on the space that are blocked on the
+ * transaction. Whether or not an actual query is made,
+ * <code>deltaT</code> is added to <code>nextQuery</code> to get the
+ * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
+ * <code>mustQuery</code> boolean is set to <code>false</code>.
+ * <p>
+ * There are two kinds of requests that a with which transaction
+ * can cause a conflict -- those with long timeouts (such as
+ * blocking reads and takes) and those that are under short timeouts
+ * (such as reads and takes with zero-length timeouts). We will
+ * treat them separately at several points of the algorithm. A
+ * short timeout is any query whose expiration time is sooner than
+ * the <code>nextQuery</code> time. Any other timeout is long
+ * If a short query arrives, <code>mustQuery</code> is set to
+ * <code>true</code>.
+ * <p>
+ * The result is that any time a transaction causes a conflict, if
+ * the query on the space has not ended by the time of the
+ * <code>nextQuery</code> we will attempt to poll the transaction manager.
+ * There will also poll the transaction manager if any conflict occurred
+ * on a query on the space with a short timeout.
+ * <p>
+ * The first time a transaction causes a conflict, we schedule a
+ * time in the future at which we will poll its status. We do not
+ * poll right away because often a transaction will complete on
+ * its own before we get to that time, making the check
+ * unnecessary. An instant poll is, therefore, unnecessarily
+ * aggressive, since giving an initial grace time will usually mean
+ * no poll is made at all. So if the first conflict occurs at
+ * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
+ * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
+ * will be <code>true</code> to force that poll to happen, and
+ * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
+ *
+ * @author Sun Microsystems, Inc.
+ *
+ * @see TxnMonitor
+ */
+class TxnMonitorTask extends RetryTask
+ implements TransactionConstants, org.apache.river.constants.TimeConstants
+{
+ /** transaction being monitored */
+ private final Txn txn;
+
+ /** the monitor we were made by */
+ private final TxnMonitor monitor;
+
+ /**
+ * All the queries on the space (not queries to the transaction
+ * manager) waiting for <code>txn</code> to be resolved.
+ * <code>null</code> until we have at least one. Represented by
+ * <code>QueryWatcher</code> objects.
+ */
+ private Map<QueryWatcher,Collection<Txn>> queries; //Sync on this.
+
+ /** count of RemoteExceptions */
+ private final AtomicInteger failCnt;
+
+ /**
+ * The next time we need to poll the transaction manager
+ * to get <code>txn</code>'s actual state.
+ */
+ private final AtomicLong nextQuery;
+
+ /**
+ * When we're given an opportunity to poll the transaction manager
+ * for the <code>txn</code>'s state, do so.
+ */
+ private volatile boolean mustQuery;
+
+ /** next value added to <code>nextQuery</code> */
+ private volatile long deltaT;
+
+ /**
+ * The initial grace period before the first query.
+ */
+ private static final long INITIAL_GRACE = 15 * SECONDS;
+
+ /**
+ * The retry time when we have an encountered an exception
+ */
+ private static final long BETWEEN_EXCEPTIONS = 15 * SECONDS;
+
+ /**
+ * The largest value that <code>deltaT</code> will reach.
+ */
+ private static final long MAX_DELTA_T = 1 * HOURS;
+
+ /**
+ * The maximum number of failures allowed in a row before we simply
+ * give up on the transaction and consider it aborted.
+ */
+ private static final int MAX_FAILURES = 3;
+
+ /** Logger for logging transaction related information */
+ private static final Logger logger =
+ Logger.getLogger(OutriggerServerImpl.txnLoggerName);
+
+ /**
+ * Create a new TxnMonitorTask.
+ */
+ TxnMonitorTask(Txn txn, TxnMonitor monitor,
+ ExecutorService manager, WakeupManager wakeupMgr) {
+ super(manager, wakeupMgr);
+ this.txn = txn;
+ this.monitor = monitor;
+ nextQuery = new AtomicLong(startTime()); // retryTime will add INITIAL_GRACE
+ deltaT = INITIAL_GRACE;
+ mustQuery = true;
+ failCnt = new AtomicInteger();
+ }
+
+ /**
+ * Return the time of the next query, bumping <code>deltaT</code> as
+ * necessary for the next iteration. If the transaction has voted
+ * <code>PREPARED</code> or the manager has been giving us a
+ * <code>RemoteException</code>, we should retry on short times;
+ * otherwise we back off quickly.
+ */
+ public long retryTime() {
+ boolean noFailures = false;
+ synchronized (txn){
+ noFailures = (failCnt.get() == 0 && txn.getState() != PREPARED);
+ }
+ if (noFailures) { // no failures
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "{0} retryTime adds {1}",
+ new Object[]{this, Long.valueOf(deltaT)});
+ }
+
+ nextQuery.addAndGet(deltaT);
+ synchronized (this){
+ if (deltaT < MAX_DELTA_T)
+ deltaT = Math.min(deltaT * 2, MAX_DELTA_T);
+ }
+ } else {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "{0} retryTime adds {1} (for {2})",
+ new Object[]{this, Long.valueOf(BETWEEN_EXCEPTIONS),
+ (failCnt.get() != 0 ? "failure" : "PREPARED")});
+ }
+ nextQuery.addAndGet(BETWEEN_EXCEPTIONS);
+ }
+
+ return nextQuery.get();
+ }
+
+ /**
+ * Add a ``sibling'' transaction, one that is now blocking progress
+ * on one of the same entries. For example, if a client is blocked
+ * on a <code>read</code>, another transaction can read the same
+ * entry, thereby also blocking that same client. This means that
+ * the transaction for the second <code>read</code> must be
+ * watched, too. The list of queries for the second transaction
+ * might be less that the list of those in this transaction, but
+ * the process of figuring out the subset is too expensive, since
+ * we have tried to make the checking process itself cheap,
+ * anyway. So we add all queries this task is currently monitoring
+ * to the task monitoring the second transaction. If there are
+ * no queries, then the blocking occurred because of a short query
+ * or all the queries have expired, in which case the second transaction
+ * isn't blocking the way of anything currently, so this method does
+ * nothing.
+ * <p>
+ * Of course, in order to avoid blocking the thread that is calling
+ * this (which is trying to perform a <code>read</code>, after
+ * all), we simply add each lease in this task to the monitor's
+ * queue.
+ *
+ */
+ // @see TxnEntryHandle#monitor
+ //!! Would it be worth the overhead to make TxnEntryHandle.monitor
+ //!! search for the transaction with the smallest set of leases? -arnold
+ synchronized void addSibling(Txn txn) {
+ if (queries == null || queries.size() == 0)
+ return;
+ Collection<Txn> sibling = Collections.nCopies(1, txn);
+ Iterator<QueryWatcher> it = queries.keySet().iterator();
+ while (it.hasNext()) {
+ QueryWatcher query = it.next();
+ if (query != null) // from a weak map, so might be null
+ monitor.add(query, sibling);
+ }
+ }
+
+ /**
+ * Try to see if this transaction should be aborted. This returns
+ * <code>true</code> (don't repeat the task) if it knows that the
+ * transaction is no longer interesting to anyone.
+ */
+ public boolean tryOnce() {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "{0} attempt {1} mustQuery:{2}",
+ new Object[]{this, Integer.valueOf(attempt()),
+ Boolean.valueOf(mustQuery) });
+ }
+
+ /*
+ * The first time we do nothing, since RetryTask invokes run first,
+ * but we want to wait a bit before testing the transaction.
+ */
+ if (attempt() == 0)
+ return false;
+ int txnState;
+ synchronized (txn){
+ txnState = txn.getState();
+ }
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "{0} txn.getState() = {1}",
+ new Object[]{this, Integer.valueOf(txnState)});
+ }
+
+ // not active or prepared == no longer blocking
+
+ if (txnState != ACTIVE && txnState != PREPARED)
+ return true;
+
+ // if we're prepared, test every time -- this shouldn't take long
+ mustQuery |= (txnState == PREPARED);
+
+ /*
+ * Go through the resources to see if we can find one still active
+ * that cares. Must be synchronized since we test, then clear --
+ * another thread that set the flag between the test and clear
+ * would have its requirements lost.
+ */
+ synchronized (this) {
+ if (!mustQuery) { // then try resources
+ if (queries == null) // no resources, so nobody wants it
+ return false; // try again next time
+
+ Iterator<QueryWatcher> it = queries.keySet().iterator();
+ boolean foundNeed = false;
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "{0} nextQuery {1}",
+ new Object[]{this, nextQuery});
+ }
+
+ while (it.hasNext()) {
+ QueryWatcher query = it.next();
+ if (query == null) // gone -- the map will reap it
+ continue;
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST,
+ "{0} query.getExpiration() {1}",
+ new Object[]{this,
+ Long.valueOf(query.getExpiration())});
+ }
+
+ if (query.getExpiration() < nextQuery.get() || query.isResolved()) {
+ it.remove();
+ } // expired, so we don't care about it
+ else {
+ foundNeed = true;
+ break;
+ }
+ }
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.log(Level.FINEST, "{0} foundNeed = {1}",
+ new Object[]{this, Boolean.valueOf(foundNeed)});
+ }
+
+ if (!foundNeed) // nobody wants it
+ return false; // try again next time
+ }
+ mustQuery = false; // clear it for next time
+ }
+
+ /*
+ * Now we know (a) the transaction itself is alive, and (b) some
+ * lease still cares. Make sure it's still active as far as the
+ * it knows, and if it is, then ask the manager about it.
+ */
+ ServerTransaction tr;
+ try {
+ /* This may fix a broken Txn, if it does it won't get moved
+ * from the broken to the unbroken list. It will get
+ * moved eventually, but it does seem unfortunate it does
+ * not happen immediately
+ */
+ tr = txn.getTransaction(
+ monitor.space().getRecoveredTransactionManagerPreparer());
+ } catch (RemoteException e) {
+ final int cat = ThrowableConstants.retryable(e);
+
+ if (cat == ThrowableConstants.BAD_INVOCATION ||
+ cat == ThrowableConstants.BAD_OBJECT)
+ {
+ // Not likely to get better, give up
+ logUnpackingFailure("definite exception", Level.INFO,
+ true, e);
+ return true;
+ } else if (cat == ThrowableConstants.INDEFINITE) {
+ // try, try, again
+ logUnpackingFailure("indefinite exception", Levels.FAILED,
+ false, e);
+ mustQuery = true;
+ return false;
+ } else if (cat == ThrowableConstants.UNCATEGORIZED) {
+ // Same as above but log differently.
+ mustQuery = true;
+ logUnpackingFailure("uncategorized exception", Level.INFO,
+ false, e);
+ return false;
+ } else {
+ logger.log(Level.WARNING, "ThrowableConstants.retryable " +
+ "returned out of range value, " + cat,
+ new AssertionError(e));
+ return false;
+ }
+ } catch (IOException e) {
+ // Not likely to get better
+ logUnpackingFailure("IOException", Level.INFO, true, e);
+ return true;
+ } catch (RuntimeException e) {
+ // Not likely to get better
+ logUnpackingFailure("RuntimeException", Level.INFO, true, e);
+ return true;
+ } catch (ClassNotFoundException e) {
+ // codebase probably down, keep trying
+ logUnpackingFailure("ClassNotFoundException", Levels.FAILED,
+ false, e);
+ mustQuery = true;
+ return false;
+ }
+
+ if (logger.isLoggable(Level.FINEST))
+ logger.log(Level.FINEST, "{0} tr = {1}", new Object[]{this, tr});
+
+ int trState;
+ try {
+ trState = tr.getState();
+ } catch (TransactionException e) {
+ if (logger.isLoggable(Level.INFO))
+ logger.log(Level.INFO, "Got TransactionException when " +
+ "calling getState on " + tr + ", dropping transaction " +
+ tr.id, e);
+ trState = ABORTED;
+ } catch (NoSuchObjectException e) {
+ /* It would be epsilon better to to give up immediately
+ * if we get a NoSuchObjectException and we are in the
+ * active state, however, the code to do this would
+ * be very complicated since we need to hold a lock to
+ * while reading and acting on the state.
+ */
+ if (failCnt.incrementAndGet() >= MAX_FAILURES) {
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, "Got NoSuchObjectException when " +
+ "calling getState on " + tr + ", this was the " +
+ failCnt + " RemoteException, dropping transaction" +
+ tr.id, e);
+ }
+ trState = ABORTED;
+ } else {
+ if (logger.isLoggable(Levels.FAILED)) {
+ logger.log(Levels.FAILED, "Got NoSuchObjectException " +
+ "when calling getState on " + tr + ", failCount = " +
+ failCnt + ", will retry", e);
+ }
+ mustQuery = true; // keep on trying
+ return false; // try again next time
+ }
+ } catch (RemoteException e) {
+ if (failCnt.incrementAndGet() >= MAX_FAILURES) {
+ /* abort if we are not prepared and not already
+ * aborted. If prepared retry, otherwise
+ * we're done. Check state and make any abort() call
+ * atomically so we can't accidently abort a prepared
+ * transaction.
+ */
+ synchronized (txn) {
+ switch (txn.getState()) {
+ case ACTIVE:
+ // Safe to abort, give up
+ if (logger.isLoggable(Level.INFO)) {
+ logger.log(Level.INFO, "Got RemoteException " +
+ "when calling getState on " + tr + ", this " +
+ "was " + failCnt + " RemoteException, " +
+ "dropping active transaction " + tr.id, e);
+ }
+
+ try {
+ monitor.space().abort(tr.mgr, tr.id);
+ return true;
+ } catch (UnknownTransactionException ute) {
+ throw new AssertionError(ute);
+ } catch (UnmarshalException ume) {
+ throw new AssertionError(ume);
+ }
+ case PREPARED:
+ final Level l = (failCnt.get()%MAX_FAILURES == 0)?
+ Level.INFO:Levels.FAILED;
+ if (logger.isLoggable(l)) {
+ logger.log(l, "Got RemoteException when calling " +
+ "getState on " + tr + ", this was " +
+ failCnt + " RemoteException, will keep " +
+ "prepared transaction " + tr.id, e);
+ }
+
+ // Can't give up, keep on trying to find real state
+ mustQuery = true;
+ return false;
+ case ABORTED:
+ case COMMITTED:
+ // done
+ return true;
+ default:
+ throw new AssertionError("Txn in unreachable state");
+ }
+ }
+ } else {
+ // Don't know, but not ready to give up
+ if (logger.isLoggable(Levels.FAILED)) {
+ logger.log(Levels.FAILED, "Got RemoteException when " +
+ "calling getState on " + tr + ", failCount = " +
+ failCnt + ", will retry", e);
+ }
+
+ mustQuery = true; // keep on trying
+ return false; // try again next time
+ }
+ }
+
+ if (logger.isLoggable(Level.FINER)) {
+ logger.log(Level.FINER, "{0} trState = {1}",
+ new Object[]{this, Integer.valueOf(trState)});
+ }
+
+ failCnt.set(0); // reset failures -- we got a response
+
+ /*
+ * If the two states aren't the same, the state changed and we
+ * need to account for that locally here by calling the method
+ * that would make the change (the one we should have gotten.
+ * (We use the external forms of abort, commit, etc., because
+ * they are what the manager would call, and therefore these
+ * calls will always do exactly what the incoming manager
+ * calls would have done. I don't want this to be fragile by
+ * bypassing those calls and going straight to the Txn
+ * object's calls, which might skip something important in the
+ * OutriggerServerImpl calls).
+ */
+
+ if (trState != txnState) {
+ if (logger.isLoggable(Level.FINER)) {
+ logger.log(Level.FINER,
+ "{0} mgr state[{1}] != local state [{2}]",
+ new Object[]{this,
+ TxnConstants.getName(trState),
+ TxnConstants.getName(txnState)});
+ }
+
+ try {
+ switch (trState) {
+ case ABORTED:
+ logger.log(Level.FINER, "{0} moving to abort", this);
+
+ monitor.space().abort(tr.mgr, tr.id);
+ return true;
+
+ case COMMITTED:
+ logger.log(Level.FINER, "{0} moving to commit", this);
+
+ monitor.space().commit(tr.mgr, tr.id);
+ return true;
+ }
+ } catch (UnknownTransactionException e) {
+ // we must somehow have already gotten the abort() or
+ // commit(), and have therefore forgotten about the
+ // transaction, while this code was executing
+ return true;
+ } catch (UnmarshalException ume) {
+ throw new AssertionError(ume);
+ }
+
+ // we can't fake anything else -- the manager will have to call
+ // us
+ }
+
+ logger.log(Level.FINEST, "{0} return false", this);
+
+ return false; // now we know so nothing more to do
+ }
+
+ /**
+ * Add in a resource. The lease may already be in, in which case it is
+ * ignored, or it may be null, in which case it was a non-leased probe
+ * that was blocked and we simply set <code>mustQuery</code> to
+ * <code>true</code>.
+ */
+ synchronized void add(QueryWatcher query) {
+ if (query == null || query.getExpiration() <= nextQuery.get()) {
+ if (logger.isLoggable(Level.FINEST))
+ logger.log(Level.FINEST, "adding resource to task -- SHORT");
+ mustQuery = true;
+ } else {
+ if (logger.isLoggable(Level.FINEST))
+ logger.log(Level.FINEST, "adding resource to task -- LONG");
+ if (queries == null)
+ queries = new WeakHashMap<QueryWatcher,Collection<Txn>>();// we use it like a WeakHashSet
+ queries.put(query, null);
+ }
+ }
+
+ /** Log failed unpacking attempt attempt */
+ private void logUnpackingFailure(String exceptionDescription, Level level,
+ boolean terminal, Throwable t)
+ {
+ if (logger.isLoggable(level)) {
+ logger.log(level, "Encountered " + exceptionDescription +
+ "while unpacking exception to check state, " +
+ (terminal?"dropping":"keeping") + " monitoring task", t);
+ }
+ }
+
+}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java
URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java?rev=1879521&r1=1879520&r2=1879521&view=diff
==============================================================================
--- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java (original)
+++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TypeTree.java Sun Jul 5 11:41:39 2020
@@ -26,6 +26,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
+import org.apache.river.outrigger.proxy.EntryRep;
+
+
/**
* A type tree for entries. It maintains, for each class, a list of