You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/03/05 07:49:06 UTC
svn commit: r383288 -
/webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
Author: dasarath
Date: Sat Mar 4 22:49:06 2006
New Revision: 383288
URL: http://svn.apache.org/viewcvs?rev=383288&view=rev
Log:
applied patch for Hannes 'speed improvements
Modified:
webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java?rev=383288&r1=383287&r2=383288&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java Sat Mar 4 22:49:06 2006
@@ -7,6 +7,7 @@
import java.net.MalformedURLException;
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,6 +32,7 @@
/**
* @author Dasarath Weeratunge
+ * @author Hannes Erven <ha...@erven.at>
*
*/
public class ATCoordinatorImpl extends CoordinatorImpl implements ATCoordinator {
@@ -49,8 +51,6 @@
public static final int RETRY_DELAY_MILLIS = 20 * 1000;
- public static final int RESPONSE_DELAY_MILLIS = 3 * 1000;
-
public ATCoordinatorImpl() throws MalformedURIException {
super(COORDINATION_TYPE_ID);
}
@@ -93,9 +93,22 @@
return epr;
}
+ /**
+ * Forget about this particular participant of the transaction. We only do
+ * this when that participant has acknowledged the final transaction
+ * outcome.
+ *
+ * @param participantRef
+ * The participant reference.
+ */
public void forget2PC(String participantRef) {
+ /*
+ * Check for which protocols the participants had registered and remove
+ */
if (volatile2PCParticipants.remove(participantRef) == null)
durable2PCParticipants.remove(participantRef);
+
+ notifyAll();
}
public void rollback() {
@@ -207,6 +220,15 @@
}
}
+ /**
+ * Handles calls to the Coordinator's "prepared" method. The given
+ * participant is marked as being prepared.
+ *
+ * @param participantRef
+ * The participant to be marked.
+ * @throws AxisFault
+ * Some fault, e.g. INVALID_STATE
+ */
public void prepared(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
@@ -217,9 +239,15 @@
}
return;
+ /*
+ * If we are currently in the PREPARE phase, mark the participant as
+ * prepared and notify the waiting thread. (wait() is called in
+ * prepare(Map) )
+ */
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
preparedParticipants.add(participantRef);
+ notifyAll();
return;
case AT2PCStatus.COMMITTING:
@@ -293,19 +321,42 @@
}
}
+ /**
+ * Send a "prepare" message to all participants in the keySet of
+ * "participants". The messages are resent up to "maxRetries" times; between
+ * each retry, up to RETRY_DELAY_MILLIS milliseconds are waited.
+ *
+ * @param participants
+ * The map which contains the participants in its keySet.
+ * @return true, if all participants are prepared. false, if there are
+ * unprepared participants remaining after trying maxRetries times.
+ */
private boolean prepare(Map participants) {
- int iters = 0;
- int status_old = status;
-
+ /*
+ * Check if there are any participants in this map
+ */
+ if (participants.size() == 0)
+ /*
+ * "No participants" means OK
+ */
+ return true;
+
+ int iters = 0; // iteration count
+ int status_old = status; // State when beginning to prepare
+
+ /*
+ * Send the "prepare" message to all unprepared participants. Retry up
+ * to maxRetries times.
+ */
while (iters < maxRetries) {
- if (iters++ > 0)
- pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
-
Iterator iter = participants.keySet().iterator();
while (iter.hasNext()) {
if (status == AT2PCStatus.ABORTING)
return false;
try {
+ /*
+ * Call the participant's "prepare" method
+ */
String participantRef = (String) iter.next();
getParticipantStub(participantRef,
(EndpointReference) participants.get(participantRef)).prepareOperation(
@@ -315,12 +366,44 @@
}
}
- pause(RESPONSE_DELAY_MILLIS);
+ /*
+ * Wait for arriving messages, max. RETRY_DELAY_MILLIS milliseconds
+ * for each retry.
+ */
+ long startTime = (new Date()).getTime();
+ long curTime;
+
+ /*
+ * For each retry: wait for incoming messages. If unprepared
+ * participants remain, wait for the remaining time up until
+ * RETRY_DELAY_MILLIS is reached or another message arrives.
+ */
+ while ((curTime = (new Date().getTime())) < startTime
+ + RETRY_DELAY_MILLIS) {
+ /*
+ * Wait for incoming messages. notifyAll() is called in
+ * prepared(String).
+ */
+ try {
+ wait(startTime - curTime + RETRY_DELAY_MILLIS);
+ } catch (Exception e) {
+ // No exception handling needed for wait();
+ }
- if (preparedParticipants.containsAll(participants.keySet()))
- return status == status_old;
+ /*
+ * Are all participants prepared?
+ */
+ if (preparedParticipants.containsAll(participants.keySet()))
+ // Yes! - Return true, if the transaction state did not
+ // change in the mean time.
+ return status == status_old;
+ }
}
+ /*
+ * After trying so hard and sending maxRetries messages, there is still
+ * at least one unprepared participant.
+ */
return false;
}
@@ -368,22 +451,39 @@
&& durable2PCParticipants.isEmpty();
}
+ /**
+ * Handles transaction teardown and communicate the final state to
+ * participants. Each participant is notified up to maxRetries times.
+ * Between each message, we will wait up to RETRY_DELAY_MILLIS milliseconds.
+ */
private void terminate() {
int iters = 0;
- while (iters < maxRetries && !noParticipantsToTerminate()) {
-
- if (iters++ > 0)
- pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
+ /*
+ * This is the main loop. While there are participants who did not yet
+ * acknowledge our message, (re)send our message to them.
+ */
+ waitForAllParticipantsToComplete: while (iters < maxRetries
+ && !noParticipantsToTerminate()) {
+ /*
+ * Participant set to operate on. For each retry, send our message
+ * to volatile peers first and then to durable participants.
+ */
Map participants = volatile2PCParticipants;
while (true) {
Iterator iter = participants.keySet().iterator();
- while (iter.hasNext())
+
+ while (iter.hasNext()) {
try {
+ /*
+ * Get the participant's protocol service and send our
+ * final state message.
+ */
String participantRef = (String) iter.next();
ParticipantStub p = getParticipantStub(
participantRef,
(EndpointReference) participants.get(participantRef));
+
if (status == AT2PCStatus.ABORTING)
p.rollbackOperation(null);
else
@@ -391,17 +491,57 @@
} catch (Exception e) {
e.printStackTrace();
}
+ }
+
+ /*
+ * After all volatile participants are notified, continue with
+ * durable participants. After that, wait for incoming messages.
+ */
if (participants == volatile2PCParticipants)
participants = durable2PCParticipants;
else
break;
}
- pause(RESPONSE_DELAY_MILLIS);
+ /*
+ * Messages to all remaining participants are out. Wait up until
+ * RETRY_DELAY_MILLIS milliseconds for replies. On each incoming
+ * reply, forget2PC() will call notify so we can check if all
+ * participants have acknownledged the transaction outcome and can
+ * thus continue. If there are remaining peers after an incoming
+ * message, return to wait and sleep for the rest of the
+ * RETRY_DELAY_MILLIS period.
+ */
+ try {
+ long startTime = (new Date()).getTime();
+ long curTime;
+
+ while ((curTime = (new Date().getTime())) < startTime
+ + RETRY_DELAY_MILLIS) {
+ /*
+ * Wait for incoming acknowledgements. notify() is called in
+ * forget2PC(String).
+ */
+ wait(startTime - curTime + RETRY_DELAY_MILLIS);
+
+ if (noParticipantsToTerminate())
+ break waitForAllParticipantsToComplete;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
+ /*
+ * All particpants have acknowledged the final transaction state. Notify
+ * all peers who have subscribed for the completion protocol and forget
+ * about our litte transaction. ;-)
+ */
if (noParticipantsToTerminate()) {
-
+ /*
+ * TODO shouldn't this message also be acknowledged, at least if
+ * there was an exception (e.g. timeout) caught?
+ */
Iterator iter = completionParticipants.iterator();
while (iter.hasNext())
try {
@@ -415,7 +555,6 @@
e.printStackTrace();
}
}
-
status = AT2PCStatus.NONE;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org