You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/01/02 20:08:01 UTC

svn commit: r365395 - /webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java

Author: dasarath
Date: Mon Jan  2 11:07:59 2006
New Revision: 365395

URL: http://svn.apache.org/viewcvs?rev=365395&view=rev
Log: (empty)

Modified:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java?rev=365395&r1=365394&r2=365395&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java Mon Jan  2 11:07:59 2006
@@ -6,7 +6,6 @@
 
 import java.rmi.RemoteException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -14,8 +13,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import javax.xml.namespace.QName;
-
 import org.apache.axis.AxisFault;
 import org.apache.axis.MessageContext;
 import org.apache.axis.components.uuid.UUIDGen;
@@ -27,7 +24,6 @@
 import org.apache.axis.types.URI.MalformedURIException;
 import org.apache.ws.transaction.coordinator.CoordinationService;
 import org.apache.ws.transaction.coordinator.CoordinatorImpl;
-import org.apache.ws.transaction.coordinator.InvalidCoordinationProtocolException;
 import org.apache.ws.transaction.coordinator.TimedOutException;
 import org.apache.ws.transaction.wsat.Notification;
 
@@ -37,19 +33,17 @@
  */
 public class ATCoordinatorImpl extends CoordinatorImpl implements ATCoordinator {
 
-	int status = AT2PCStatus.NONE;
-
-	private static final int VOLATILE = 0;
+	int status = AT2PCStatus.ACTIVE;
 
-	private static final int DURABLE = 1;
+	Set preparedParticipants = new HashSet();
 
-	Map participants2PC[] = new Map[2];
+	List completionParticipants = new ArrayList();
 
-	Set prepared = Collections.synchronizedSet(new HashSet());
+	Map volatile2PCParticipants = new HashMap();
 
-	List participantsComp = Collections.synchronizedList(new ArrayList());
+	Map durable2PCParticipants = new HashMap();
 
-	public static final int MAX_RETRIES = 10;
+	public static int maxRetries = 10;
 
 	public static final int RETRY_DELAY_MILLIS = 20 * 1000;
 
@@ -57,13 +51,10 @@
 
 	public ATCoordinatorImpl() throws MalformedURIException {
 		super(COORDINATION_TYPE_ID);
-		status = AT2PCStatus.ACTIVE;
-		for (int i = 0; i < 2; i++)
-			participants2PC[i] = Collections.synchronizedMap(new HashMap());
 	}
 
 	public EndpointReference register(String prot, EndpointReference pps)
-			throws InvalidCoordinationProtocolException {
+			throws AxisFault {
 		if (!(status == AT2PCStatus.ACTIVE || status == AT2PCStatus.PREPARING_VOLATILE))
 			throw new IllegalStateException();
 		CoordinationService cs = CoordinationService.getInstance();
@@ -71,7 +62,7 @@
 		EndpointReference epr = null;
 		if (prot.equals(PROTOCOL_ID_COMPLETION)) {
 			if (pps != null)
-				participantsComp.add(pps);
+				completionParticipants.add(pps);
 			epr = cs.getCompletionCoordinatorService(this);
 		} else {
 			if (pps == null)
@@ -79,19 +70,19 @@
 			UUIDGen gen = UUIDGenFactory.getUUIDGen();
 			ref = "uuid:" + gen.nextUUID();
 			if (prot.equals(PROTOCOL_ID_VOLATILE_2PC))
-				participants2PC[VOLATILE].put(ref, pps);
+				volatile2PCParticipants.put(ref, pps);
 			else if (prot.equals(PROTOCOL_ID_DURABLE_2PC))
-				participants2PC[DURABLE].put(ref, pps);
+				durable2PCParticipants.put(ref, pps);
 			else
-				throw new InvalidCoordinationProtocolException();
+				throw INVALID_PROTOCOL_SOAP_FAULT;
 			epr = cs.getCoordinatorService(this, ref);
 		}
 		return epr;
 	}
 
 	public void forget2PC(String ref) {
-		if (participants2PC[VOLATILE].remove(ref) == null)
-			participants2PC[DURABLE].remove(ref);
+		if (volatile2PCParticipants.remove(ref) == null)
+			durable2PCParticipants.remove(ref);
 	}
 
 	public void rollback() {
@@ -109,7 +100,7 @@
 		}
 	}
 
-	public void aborted(String ref) {
+	public void aborted(String ref) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
@@ -119,7 +110,7 @@
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			throw new IllegalStateException();
+			throw INVALID_STATE_SOAP_FAULT;
 
 		case AT2PCStatus.ABORTING:
 			forget2PC(ref);
@@ -129,7 +120,7 @@
 		}
 	}
 
-	public void readOnly(String ref) {
+	public void readOnly(String ref) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
@@ -138,7 +129,7 @@
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			throw new IllegalStateException();
+			throw INVALID_STATE_SOAP_FAULT;
 
 		case AT2PCStatus.ABORTING:
 			forget2PC(ref);
@@ -148,7 +139,7 @@
 		}
 	}
 
-	public void replay(String ref) {
+	public void replay(String ref) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
@@ -157,110 +148,120 @@
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			EndpointReference epr = getEpr(ref);
-			try {
-				new ParticipantStub(epr).commitOperation(null);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
+			EndpointReference epr = getEprOf2PCParticipant(ref);
+			if (epr != null)
+				try {
+					new ParticipantStub(epr).commitOperation(null);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 			return;
 
 		case AT2PCStatus.ABORTING:
-			epr = getEpr(ref);
-			try {
-				new ParticipantStub(epr).rollbackOperation(null);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
+			epr = getEprOf2PCParticipant(ref);
+			if (epr != null)
+				try {
+					new ParticipantStub(epr).rollbackOperation(null);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 			return;
 
 		case AT2PCStatus.NONE:
-			epr = (EndpointReference) participants2PC[VOLATILE].get(ref);
+			if (volatile2PCParticipants.containsKey(ref))
+				throw INVALID_STATE_SOAP_FAULT;
+			epr = (EndpointReference) durable2PCParticipants.get(ref);
 			if (epr != null)
-				throw new IllegalStateException();
-			epr = (EndpointReference) participants2PC[DURABLE].get(ref);
-			try {
-				new ParticipantStub(epr).rollbackOperation(null);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
+				try {
+					new ParticipantStub(epr).rollbackOperation(null);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 		}
 	}
 
-	public void prepared(String ref) {
+	public void prepared(String ref) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 			rollback();
-			throw new IllegalStateException();
+			throw INVALID_STATE_SOAP_FAULT;
 
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			prepared.add(ref);
+			preparedParticipants.add(ref);
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			EndpointReference epr = getEpr(ref);
-			try {
-				new ParticipantStub(epr).commitOperation(null);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
+			EndpointReference epr = getEprOf2PCParticipant(ref);
+			if (epr != null)
+				try {
+					new ParticipantStub(epr).commitOperation(null);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 			return;
 
 		case AT2PCStatus.ABORTING:
-		case AT2PCStatus.NONE:
-			epr = (EndpointReference) participants2PC[VOLATILE].get(ref);
+			if (volatile2PCParticipants.remove(ref) != null) 
+				throw INVALID_STATE_SOAP_FAULT;
+			epr = (EndpointReference) durable2PCParticipants.remove(ref);
 			if (epr != null) {
-				if (status == AT2PCStatus.ABORTING)
-					forget2PC(ref);
-				throw new IllegalStateException();
-			}
-			epr = (EndpointReference) participants2PC[DURABLE].get(ref);
-			if (status == AT2PCStatus.ABORTING)
-				forget2PC(ref);
-			try {
-				new ParticipantStub(epr).rollbackOperation(null);
-			} catch (Exception e) {
-				e.printStackTrace();
+				try {
+					new ParticipantStub(epr).rollbackOperation(null);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 			}
+			return;
+
+		case AT2PCStatus.NONE:
+			if (volatile2PCParticipants.containsKey(ref))
+				throw INVALID_STATE_SOAP_FAULT;
+			epr = (EndpointReference) durable2PCParticipants.get(ref);
+			if (epr != null)
+				try {
+					new ParticipantStub(epr).rollbackOperation(null);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
 		}
 	}
 
-	public void committed(String ref) {
+	public void committed(String ref) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
 			rollback();
-			throw new IllegalStateException();
+			throw INVALID_STATE_SOAP_FAULT;
 
 		case AT2PCStatus.COMMITTING:
 			forget2PC(ref);
 			return;
 
 		case AT2PCStatus.ABORTING:
-			throw new IllegalStateException();
+			throw INVALID_STATE_SOAP_FAULT;
 
 		case AT2PCStatus.NONE:
 		}
 	}
 
-	private EndpointReference getEpr(String ref) {
-		EndpointReference epr = (EndpointReference) participants2PC[VOLATILE].get(ref);
-		if (epr == null)
-			epr = (EndpointReference) participants2PC[DURABLE].get(ref);
-		return epr;
+	private EndpointReference getEprOf2PCParticipant(String ref) {
+		EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(ref);
+		if (epr != null)
+			return epr;
+		return (EndpointReference) durable2PCParticipants.get(ref);
 	}
 
-	private boolean prepare(int prot) {
+	private boolean prepare(Map participants) {
 		int iters = 0;
 		int status_old = status;
 
-		while (iters < MAX_RETRIES) {
+		while (iters < maxRetries) {
 			if (iters++ > 0)
 				pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
 
-			Iterator iter = participants2PC[prot].values().iterator();
+			Iterator iter = participants.values().iterator();
 			while (iter.hasNext()) {
 				if (status == AT2PCStatus.ABORTING)
 					return false;
@@ -273,7 +274,7 @@
 
 			pause(RESPONSE_DELAY_MILLIS);
 
-			if (prepared.containsAll(participants2PC[prot].keySet()))
+			if (preparedParticipants.containsAll(participants.keySet()))
 				return status == status_old;
 		}
 
@@ -282,11 +283,11 @@
 
 	private boolean prepare() {
 		status = AT2PCStatus.PREPARING_VOLATILE;
-		if (!prepare(VOLATILE))
+		if (!prepare(volatile2PCParticipants))
 			return false;
 
 		status = AT2PCStatus.PREPARING_DURABLE;
-		return prepare(DURABLE);
+		return prepare(durable2PCParticipants);
 	}
 
 	public void commit() {
@@ -319,16 +320,21 @@
 		}
 	}
 
+	private boolean noParticipantsToTerminate() {
+		return volatile2PCParticipants.isEmpty()
+				&& durable2PCParticipants.isEmpty();
+	}
+
 	private void terminate() {
 		int iters = 0;
-		while (iters < MAX_RETRIES
-				&& !(participants2PC[VOLATILE].isEmpty() && participants2PC[DURABLE].isEmpty())) {
+		while (iters < maxRetries && !noParticipantsToTerminate()) {
+
 			if (iters++ > 0)
 				pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
 
-			for (int prot = VOLATILE; prot == VOLATILE || prot == DURABLE; prot = prot == VOLATILE ? DURABLE
-					: -1) {
-				Iterator iter = participants2PC[prot].values().iterator();
+			Map participants = volatile2PCParticipants;
+			while (true) {
+				Iterator iter = participants.values().iterator();
 				while (iter.hasNext())
 					try {
 						ParticipantStub p = new ParticipantStub(
@@ -340,14 +346,18 @@
 					} catch (Exception e) {
 						e.printStackTrace();
 					}
+				if (participants == volatile2PCParticipants)
+					participants = durable2PCParticipants;
+				else
+					break;
 			}
 
 			pause(RESPONSE_DELAY_MILLIS);
 		}
 
-		if (participants2PC[VOLATILE].isEmpty()
-				&& participants2PC[DURABLE].isEmpty()) {
-			Iterator iter = participantsComp.iterator();
+		if (noParticipantsToTerminate()) {
+
+			Iterator iter = completionParticipants.iterator();
 			while (iter.hasNext())
 				try {
 					CompletionInitiatorStub p = new CompletionInitiatorStub(
@@ -364,20 +374,9 @@
 		status = AT2PCStatus.NONE;
 	}
 
-	private AxisFault getInvalidStateSoapFault() {
-		QName subcode = new QName(
-				"http://schemas.xmlsoap.org/ws/2004/10/wscoor", "InvalidState");
-		String faultString = "The message was invalid for the current state of the activity.";
-		return new AxisFault(subcode, faultString, null, null);
-	}
-
 	public synchronized void preparedOperation(Notification parameters)
 			throws RemoteException {
-		try {
-			prepared(getParticipantRef());
-		} catch (IllegalStateException e) {
-			throw getInvalidStateSoapFault();
-		}
+		prepared(getParticipantRef());
 	}
 
 	public synchronized void abortedOperation(Notification parameters)
@@ -400,10 +399,14 @@
 		replay(getParticipantRef());
 	}
 
-	private synchronized String getParticipantRef() {
-		AddressingHeaders header = (AddressingHeaders) MessageContext.getCurrentContext().getProperty(
+	private AddressingHeaders getAddressingHeaders() {
+		return (AddressingHeaders) MessageContext.getCurrentContext().getProperty(
 			Constants.ENV_ADDRESSING_REQUEST_HEADERS);
-		MessageElement e = header.getReferenceProperties().get(PARTICIPANT_REF);
+	}
+
+	private String getParticipantRef() {
+		AddressingHeaders headers = getAddressingHeaders();
+		MessageElement e = headers.getReferenceProperties().get(PARTICIPANT_REF);
 		return e.getValue();
 	}
 
@@ -417,12 +420,13 @@
 		rollback();
 	}
 
-	public synchronized void timeout() {
+	public synchronized void timeout() throws TimedOutException {
 		System.out.println("[ATCoordinatorImpl] timeout "
 				+ AT2PCStatus.getStatusName(status));
-		if (status == AT2PCStatus.NONE)
-			return;
-		rollback();
-		throw new TimedOutException();
+		if (status != AT2PCStatus.NONE) {
+			maxRetries = 3;
+			rollback();
+			throw new TimedOutException();
+		}
 	}
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org