You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/03/05 07:49:06 UTC

svn commit: r383288 - /webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java

Author: dasarath
Date: Sat Mar  4 22:49:06 2006
New Revision: 383288

URL: http://svn.apache.org/viewcvs?rev=383288&view=rev
Log:
applied patch for Hannes 'speed improvements

Modified:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java?rev=383288&r1=383287&r2=383288&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java Sat Mar  4 22:49:06 2006
@@ -7,6 +7,7 @@
 import java.net.MalformedURLException;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,6 +32,7 @@
 
 /**
  * @author Dasarath Weeratunge
+ * @author Hannes Erven <ha...@erven.at>
  *  
  */
 public class ATCoordinatorImpl extends CoordinatorImpl implements ATCoordinator {
@@ -49,8 +51,6 @@
 
 	public static final int RETRY_DELAY_MILLIS = 20 * 1000;
 
-	public static final int RESPONSE_DELAY_MILLIS = 3 * 1000;
-
 	public ATCoordinatorImpl() throws MalformedURIException {
 		super(COORDINATION_TYPE_ID);
 	}
@@ -93,9 +93,22 @@
 		return epr;
 	}
 
+	/**
+	 * Forget about this particular participant of the transaction. We only do
+	 * this when that participant has acknowledged the final transaction
+	 * outcome.
+	 * 
+	 * @param participantRef
+	 *            The participant reference.
+	 */
 	public void forget2PC(String participantRef) {
+		/*
+		 * Check for which protocols the participants had registered and remove
+		 */
 		if (volatile2PCParticipants.remove(participantRef) == null)
 			durable2PCParticipants.remove(participantRef);
+
+		notifyAll();
 	}
 
 	public void rollback() {
@@ -207,6 +220,15 @@
 		}
 	}
 
+	/**
+	 * Handles calls to the Coordinator's "prepared" method. The given
+	 * participant is marked as being prepared.
+	 * 
+	 * @param participantRef
+	 *            The participant to be marked.
+	 * @throws AxisFault
+	 *             Some fault, e.g. INVALID_STATE
+	 */
 	public void prepared(String participantRef) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
@@ -217,9 +239,15 @@
 			}
 			return;
 
+		/*
+		 * If we are currently in the PREPARE phase, mark the participant as
+		 * prepared and notify the waiting thread. (wait() is called in
+		 * prepare(Map) )
+		 */
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
 			preparedParticipants.add(participantRef);
+			notifyAll();
 			return;
 
 		case AT2PCStatus.COMMITTING:
@@ -293,19 +321,42 @@
 		}
 	}
 
+	/**
+	 * Send a "prepare" message to all participants in the keySet of
+	 * "participants". The messages are resent up to "maxRetries" times; between
+	 * each retry, up to RETRY_DELAY_MILLIS milliseconds are waited.
+	 * 
+	 * @param participants
+	 *            The map which contains the participants in its keySet.
+	 * @return true, if all participants are prepared. false, if there are
+	 *         unprepared participants remaining after trying maxRetries times.
+	 */
 	private boolean prepare(Map participants) {
-		int iters = 0;
-		int status_old = status;
-
+		/*
+		 * Check if there are any participants in this map
+		 */
+		if (participants.size() == 0)
+			/*
+			 * "No participants" means OK
+			 */
+			return true;
+
+		int iters = 0; // iteration count
+		int status_old = status; // State when beginning to prepare
+
+		/*
+		 * Send the "prepare" message to all unprepared participants. Retry up
+		 * to maxRetries times.
+		 */
 		while (iters < maxRetries) {
-			if (iters++ > 0)
-				pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
-
 			Iterator iter = participants.keySet().iterator();
 			while (iter.hasNext()) {
 				if (status == AT2PCStatus.ABORTING)
 					return false;
 				try {
+					/*
+					 * Call the participant's "prepare" method
+					 */
 					String participantRef = (String) iter.next();
 					getParticipantStub(participantRef,
 						(EndpointReference) participants.get(participantRef)).prepareOperation(
@@ -315,12 +366,44 @@
 				}
 			}
 
-			pause(RESPONSE_DELAY_MILLIS);
+			/*
+			 * Wait for arriving messages, max. RETRY_DELAY_MILLIS milliseconds
+			 * for each retry.
+			 */
+			long startTime = (new Date()).getTime();
+			long curTime;
+
+			/*
+			 * For each retry: wait for incoming messages. If unprepared
+			 * participants remain, wait for the remaining time up until
+			 * RETRY_DELAY_MILLIS is reached or another message arrives.
+			 */
+			while ((curTime = (new Date().getTime())) < startTime
+					+ RETRY_DELAY_MILLIS) {
+				/*
+				 * Wait for incoming messages. notifyAll() is called in
+				 * prepared(String).
+				 */
+				try {
+					wait(startTime - curTime + RETRY_DELAY_MILLIS);
+				} catch (Exception e) {
+					// No exception handling needed for wait();
+				}
 
-			if (preparedParticipants.containsAll(participants.keySet()))
-				return status == status_old;
+				/*
+				 * Are all participants prepared?
+				 */
+				if (preparedParticipants.containsAll(participants.keySet()))
+					// Yes! - Return true, if the transaction state did not
+					// change in the mean time.
+					return status == status_old;
+			}
 		}
 
+		/*
+		 * After trying so hard and sending maxRetries messages, there is still
+		 * at least one unprepared participant.
+		 */
 		return false;
 	}
 
@@ -368,22 +451,39 @@
 				&& durable2PCParticipants.isEmpty();
 	}
 
+	/**
+	 * Handles transaction teardown and communicate the final state to
+	 * participants. Each participant is notified up to maxRetries times.
+	 * Between each message, we will wait up to RETRY_DELAY_MILLIS milliseconds.
+	 */
 	private void terminate() {
 		int iters = 0;
-		while (iters < maxRetries && !noParticipantsToTerminate()) {
-
-			if (iters++ > 0)
-				pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
 
+		/*
+		 * This is the main loop. While there are participants who did not yet
+		 * acknowledge our message, (re)send our message to them.
+		 */
+		waitForAllParticipantsToComplete: while (iters < maxRetries
+				&& !noParticipantsToTerminate()) {
+			/*
+			 * Participant set to operate on. For each retry, send our message
+			 * to volatile peers first and then to durable participants.
+			 */
 			Map participants = volatile2PCParticipants;
 			while (true) {
 				Iterator iter = participants.keySet().iterator();
-				while (iter.hasNext())
+
+				while (iter.hasNext()) {
 					try {
+						/*
+						 * Get the participant's protocol service and send our
+						 * final state message.
+						 */
 						String participantRef = (String) iter.next();
 						ParticipantStub p = getParticipantStub(
 							participantRef,
 							(EndpointReference) participants.get(participantRef));
+
 						if (status == AT2PCStatus.ABORTING)
 							p.rollbackOperation(null);
 						else
@@ -391,17 +491,57 @@
 					} catch (Exception e) {
 						e.printStackTrace();
 					}
+				}
+
+				/*
+				 * After all volatile participants are notified, continue with
+				 * durable participants. After that, wait for incoming messages.
+				 */
 				if (participants == volatile2PCParticipants)
 					participants = durable2PCParticipants;
 				else
 					break;
 			}
 
-			pause(RESPONSE_DELAY_MILLIS);
+			/*
+			 * Messages to all remaining participants are out. Wait up until
+			 * RETRY_DELAY_MILLIS milliseconds for replies. On each incoming
+			 * reply, forget2PC() will call notify so we can check if all
+			 * participants have acknownledged the transaction outcome and can
+			 * thus continue. If there are remaining peers after an incoming
+			 * message, return to wait and sleep for the rest of the
+			 * RETRY_DELAY_MILLIS period.
+			 */
+			try {
+				long startTime = (new Date()).getTime();
+				long curTime;
+
+				while ((curTime = (new Date().getTime())) < startTime
+						+ RETRY_DELAY_MILLIS) {
+					/*
+					 * Wait for incoming acknowledgements. notify() is called in
+					 * forget2PC(String).
+					 */
+					wait(startTime - curTime + RETRY_DELAY_MILLIS);
+
+					if (noParticipantsToTerminate())
+						break waitForAllParticipantsToComplete;
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
 		}
 
+		/*
+		 * All particpants have acknowledged the final transaction state. Notify
+		 * all peers who have subscribed for the completion protocol and forget
+		 * about our litte transaction. ;-)
+		 */
 		if (noParticipantsToTerminate()) {
-
+			/*
+			 * TODO shouldn't this message also be acknowledged, at least if
+			 * there was an exception (e.g. timeout) caught?
+			 */
 			Iterator iter = completionParticipants.iterator();
 			while (iter.hasNext())
 				try {
@@ -415,7 +555,6 @@
 					e.printStackTrace();
 				}
 		}
-
 		status = AT2PCStatus.NONE;
 	}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org