You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2007/06/18 22:07:38 UTC

svn commit: r548467 - /webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java

Author: dasarath
Date: Mon Jun 18 13:07:36 2007
New Revision: 548467

URL: http://svn.apache.org/viewvc?view=rev&rev=548467
Log:
Hannes Erven, Georg Hicker

Modified:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewvc/webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java?view=diff&rev=548467&r1=548466&r2=548467
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/kandula/coordinator/at/ATCoordinatorImpl.java Mon Jun 18 13:07:36 2007
@@ -55,11 +55,16 @@
 		super(COORDINATION_TYPE_ID);
 	}
 
-	public EndpointReference register(String protocol,
-			EndpointReference participantProtocolService)
-			throws InvalidCoordinationProtocolException {
+	/**
+	 * Register a new participant 
+	 */
+	public EndpointReference register(
+			final String protocol,
+			final EndpointReference participantProtocolService,
+			final String matchcode
+	) throws InvalidCoordinationProtocolException {
 
-		if (!(status == AT2PCStatus.ACTIVE || status == AT2PCStatus.PREPARING_VOLATILE))
+		if (!(this.status == AT2PCStatus.ACTIVE || this.status == AT2PCStatus.PREPARING_VOLATILE))
 			throw new IllegalStateException();
 
 		CoordinationService cs = CoordinationService.getInstance();
@@ -68,7 +73,7 @@
 
 		if (protocol.equals(PROTOCOL_ID_COMPLETION)) {
 			if (participantProtocolService != null)
-				completionParticipants.add(participantProtocolService);
+				this.completionParticipants.add(participantProtocolService);
 
 			epr = cs.getCompletionCoordinatorService(this);
 		} else {
@@ -79,10 +84,10 @@
 			participantRef = "uuid:" + gen.nextUUID();
 
 			if (protocol.equals(PROTOCOL_ID_VOLATILE_2PC))
-				volatile2PCParticipants.put(participantRef,
+				this.volatile2PCParticipants.put(participantRef,
 					participantProtocolService);
 			else if (protocol.equals(PROTOCOL_ID_DURABLE_2PC))
-				durable2PCParticipants.put(participantRef,
+				this.durable2PCParticipants.put(participantRef,
 					participantProtocolService);
 			else
 				throw new InvalidCoordinationProtocolException();
@@ -105,18 +110,18 @@
 		/*
 		 * Check for which protocols the participants had registered and remove
 		 */
-		if (volatile2PCParticipants.remove(participantRef) == null)
-			durable2PCParticipants.remove(participantRef);
+		if (this.volatile2PCParticipants.remove(participantRef) == null)
+			this.durable2PCParticipants.remove(participantRef);
 
 		notifyAll();
 	}
 
 	public void rollback() {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			status = AT2PCStatus.ABORTING;
+			this.status = AT2PCStatus.ABORTING;
 			terminate();
 			return;
 
@@ -127,7 +132,7 @@
 	}
 
 	public void aborted(String participantRef) throws AxisFault {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
@@ -148,7 +153,7 @@
 	}
 
 	public void readOnly(String participantRef) throws AxisFault {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
@@ -173,7 +178,7 @@
 	}
 
 	public void replay(String participantRef) throws AxisFault {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
@@ -205,8 +210,9 @@
 		case AT2PCStatus.NONE:
 			if (volatile2PCParticipants.containsKey(participantRef))
 				trigger(participantRef, INVALID_STATE_SOAP_FAULT());
+
 			else {
-				epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+				epr = (EndpointReference) this.durable2PCParticipants.get(participantRef);
 				if (epr == null)
 					epr = org.apache.kandula.utils.AddressingHeaders.getReplyToOfCurrentMessage();
 				if (epr != null)
@@ -230,7 +236,7 @@
 	 *             Some fault, e.g. INVALID_STATE
 	 */
 	public void prepared(String participantRef) throws AxisFault {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 			try {
 				trigger(participantRef, INVALID_STATE_SOAP_FAULT());
@@ -246,7 +252,7 @@
 		 */
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			preparedParticipants.add(participantRef);
+			this.preparedParticipants.add(participantRef);
 			notifyAll();
 			return;
 
@@ -264,8 +270,9 @@
 		case AT2PCStatus.ABORTING:
 			if (volatile2PCParticipants.remove(participantRef) != null)
 				trigger(participantRef, INVALID_STATE_SOAP_FAULT());
+
 			else {
-				epr = (EndpointReference) durable2PCParticipants.remove(participantRef);
+				epr = (EndpointReference) this.durable2PCParticipants.remove(participantRef);
 				if (epr == null)
 					epr = org.apache.kandula.utils.AddressingHeaders.getReplyToOfCurrentMessage();
 				if (epr != null) {
@@ -282,8 +289,9 @@
 		case AT2PCStatus.NONE:
 			if (volatile2PCParticipants.containsKey(participantRef))
 				trigger(participantRef, INVALID_STATE_SOAP_FAULT());
+
 			else {
-				epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+				epr = (EndpointReference) this.durable2PCParticipants.get(participantRef);
 				if (epr == null)
 					epr = org.apache.kandula.utils.AddressingHeaders.getReplyToOfCurrentMessage();
 				if (epr != null)
@@ -298,7 +306,7 @@
 	}
 
 	public void committed(String participantRef) throws AxisFault {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
@@ -342,7 +350,7 @@
 			return true;
 
 		int iters = 0; // iteration count
-		int status_old = status; // State when beginning to prepare
+		int status_old = this.status; // State when beginning to prepare
 
 		/*
 		 * Send the "prepare" message to all unprepared participants. Retry up
@@ -351,7 +359,7 @@
 		while (iters < maxRetries) {
 			Iterator iter = participants.keySet().iterator();
 			while (iter.hasNext()) {
-				if (status == AT2PCStatus.ABORTING)
+				if (this.status == AT2PCStatus.ABORTING)
 					return false;
 				try {
 					/*
@@ -393,10 +401,10 @@
 				/*
 				 * Are all participants prepared?
 				 */
-				if (preparedParticipants.containsAll(participants.keySet()))
+				if (this.preparedParticipants.containsAll(participants.keySet()))
 					// Yes! - Return true, if the transaction state did not
 					// change in the mean time.
-					return status == status_old;
+					return this.status == status_old;
 			}
 		}
 
@@ -408,16 +416,16 @@
 	}
 
 	private boolean prepare() {
-		status = AT2PCStatus.PREPARING_VOLATILE;
-		if (!prepare(volatile2PCParticipants))
+		this.status = AT2PCStatus.PREPARING_VOLATILE;
+		if (! prepare(this.volatile2PCParticipants))
 			return false;
 
-		status = AT2PCStatus.PREPARING_DURABLE;
-		return prepare(durable2PCParticipants);
+		this.status = AT2PCStatus.PREPARING_DURABLE;
+		return prepare(this.durable2PCParticipants);
 	}
 
 	public void commit() {
-		switch (status) {
+		switch (this.status) {
 		case AT2PCStatus.ACTIVE:
 			break;
 
@@ -434,21 +442,13 @@
 			return;
 		}
 
-		status = AT2PCStatus.COMMITTING;
+		this.status = AT2PCStatus.COMMITTING;
 		terminate();
 	}
 
-	private void pause(long millis) {
-		try {
-			wait(millis);
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-		}
-	}
-
 	private boolean noParticipantsToTerminate() {
-		return volatile2PCParticipants.isEmpty()
-				&& durable2PCParticipants.isEmpty();
+		return this.volatile2PCParticipants.isEmpty()
+				&& this.durable2PCParticipants.isEmpty();
 	}
 
 	/**
@@ -469,7 +469,7 @@
 			 * Participant set to operate on. For each retry, send our message
 			 * to volatile peers first and then to durable participants.
 			 */
-			Map participants = volatile2PCParticipants;
+			Map participants = this.volatile2PCParticipants;
 			while (true) {
 				Iterator iter = participants.keySet().iterator();
 
@@ -484,7 +484,7 @@
 							participantRef,
 							(EndpointReference) participants.get(participantRef));
 
-						if (status == AT2PCStatus.ABORTING)
+						if (this.status == AT2PCStatus.ABORTING)
 							p.rollbackOperation(null);
 						else
 							p.commitOperation(null);
@@ -497,8 +497,8 @@
 				 * After all volatile participants are notified, continue with
 				 * durable participants. After that, wait for incoming messages.
 				 */
-				if (participants == volatile2PCParticipants)
-					participants = durable2PCParticipants;
+				if (participants == this.volatile2PCParticipants)
+					participants = this.durable2PCParticipants;
 				else
 					break;
 			}
@@ -539,15 +539,15 @@
 		 */
 		if (noParticipantsToTerminate()) {
 			/*
-			 * TODO shouldn't this message also be acknowledged, at least if
+			 * TODO WSAT shouldn't this message also be acknowledged, at least if
 			 * there was an exception (e.g. timeout) caught?
 			 */
-			Iterator iter = completionParticipants.iterator();
+			Iterator iter = this.completionParticipants.iterator();
 			while (iter.hasNext())
 				try {
 					CompletionInitiatorStub p = new CompletionInitiatorStub(
 							(EndpointReference) iter.next());
-					if (status == AT2PCStatus.ABORTING)
+					if (this.status == AT2PCStatus.ABORTING)
 						p.abortedOperation(null);
 					else
 						p.committedOperation(null);
@@ -555,7 +555,7 @@
 					e.printStackTrace();
 				}
 		}
-		status = AT2PCStatus.NONE;
+		this.status = AT2PCStatus.NONE;
 	}
 
 	public synchronized void preparedOperation(Notification parameters)
@@ -591,10 +591,10 @@
 	}
 
 	private EndpointReference getEprToRespond(String participantRef) {
-		EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(participantRef);
+		EndpointReference epr = (EndpointReference) this.volatile2PCParticipants.get(participantRef);
 		if (epr != null)
 			return epr;
-		epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+		epr = (EndpointReference) this.durable2PCParticipants.get(participantRef);
 		if (epr != null)
 			return epr;
 		return org.apache.kandula.utils.AddressingHeaders.getReplyToOfCurrentMessage();
@@ -618,9 +618,9 @@
 
 	public synchronized void timeout() throws TimedOutException {
 		System.out.println("[ATCoordinatorImpl] timeout "
-				+ AT2PCStatus.getStatusName(status));
+				+ AT2PCStatus.getStatusName(this.status));
 
-		if (status != AT2PCStatus.NONE) {
+		if (this.status != AT2PCStatus.NONE) {
 			maxRetries = 8;
 			rollback();
 			throw new TimedOutException();
@@ -638,7 +638,7 @@
 	 * @see org.apache.ws.transaction.coordinator.Callback#onFault(javax.xml.soap.Name)
 	 */
 	public synchronized void onFault(Name code) {
-		// TODO Auto-generated method stub
+		// TODO WSAT Auto-generated method stub
 
 	}
 



---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org