You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/01/07 06:33:42 UTC

svn commit: r366671 - in /webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at: ATCoordinatorImpl.java CompletionCoordinatorStub.java CoordinatorImpl.java TransactionManagerImpl.java

Author: dasarath
Date: Fri Jan  6 21:33:39 2006
New Revision: 366671

URL: http://svn.apache.org/viewcvs?rev=366671&view=rev
Log: (empty)

Modified:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java Fri Jan  6 21:33:39 2006
@@ -4,6 +4,7 @@
  */
 package org.apache.ws.transaction.coordinator.at;
 
+import java.net.MalformedURLException;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -13,17 +14,18 @@
 import java.util.Map;
 import java.util.Set;
 
+import javax.xml.soap.Name;
+
 import org.apache.axis.AxisFault;
-import org.apache.axis.MessageContext;
 import org.apache.axis.components.uuid.UUIDGen;
 import org.apache.axis.components.uuid.UUIDGenFactory;
 import org.apache.axis.message.MessageElement;
 import org.apache.axis.message.addressing.AddressingHeaders;
-import org.apache.axis.message.addressing.Constants;
 import org.apache.axis.message.addressing.EndpointReference;
 import org.apache.axis.types.URI.MalformedURIException;
 import org.apache.ws.transaction.coordinator.CoordinationService;
 import org.apache.ws.transaction.coordinator.CoordinatorImpl;
+import org.apache.ws.transaction.coordinator.InvalidCoordinationProtocolException;
 import org.apache.ws.transaction.coordinator.TimedOutException;
 import org.apache.ws.transaction.wsat.Notification;
 
@@ -53,36 +55,47 @@
 		super(COORDINATION_TYPE_ID);
 	}
 
-	public EndpointReference register(String prot, EndpointReference pps)
-			throws AxisFault {
+	public EndpointReference register(String protocol,
+			EndpointReference participantProtocolService)
+			throws InvalidCoordinationProtocolException {
+
 		if (!(status == AT2PCStatus.ACTIVE || status == AT2PCStatus.PREPARING_VOLATILE))
 			throw new IllegalStateException();
+
 		CoordinationService cs = CoordinationService.getInstance();
-		String ref = null;
+		String participantRef = null;
 		EndpointReference epr = null;
-		if (prot.equals(PROTOCOL_ID_COMPLETION)) {
-			if (pps != null)
-				completionParticipants.add(pps);
+
+		if (protocol.equals(PROTOCOL_ID_COMPLETION)) {
+			if (participantProtocolService != null)
+				completionParticipants.add(participantProtocolService);
+
 			epr = cs.getCompletionCoordinatorService(this);
 		} else {
-			if (pps == null)
+			if (participantProtocolService == null)
 				throw new IllegalArgumentException();
+
 			UUIDGen gen = UUIDGenFactory.getUUIDGen();
-			ref = "uuid:" + gen.nextUUID();
-			if (prot.equals(PROTOCOL_ID_VOLATILE_2PC))
-				volatile2PCParticipants.put(ref, pps);
-			else if (prot.equals(PROTOCOL_ID_DURABLE_2PC))
-				durable2PCParticipants.put(ref, pps);
+			participantRef = "uuid:" + gen.nextUUID();
+
+			if (protocol.equals(PROTOCOL_ID_VOLATILE_2PC))
+				volatile2PCParticipants.put(participantRef,
+					participantProtocolService);
+			else if (protocol.equals(PROTOCOL_ID_DURABLE_2PC))
+				durable2PCParticipants.put(participantRef,
+					participantProtocolService);
 			else
-				throw INVALID_PROTOCOL_SOAP_FAULT;
-			epr = cs.getCoordinatorService(this, ref);
+				throw new InvalidCoordinationProtocolException();
+
+			epr = cs.getCoordinatorService(this, participantRef);
 		}
+
 		return epr;
 	}
 
-	public void forget2PC(String ref) {
-		if (volatile2PCParticipants.remove(ref) == null)
-			durable2PCParticipants.remove(ref);
+	public void forget2PC(String participantRef) {
+		if (volatile2PCParticipants.remove(participantRef) == null)
+			durable2PCParticipants.remove(participantRef);
 	}
 
 	public void rollback() {
@@ -100,46 +113,53 @@
 		}
 	}
 
-	public void aborted(String ref) throws AxisFault {
+	public void aborted(String participantRef) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			forget2PC(ref);
+			forget2PC(participantRef);
 			rollback();
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			throw INVALID_STATE_SOAP_FAULT;
+			throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			return;
 
 		case AT2PCStatus.ABORTING:
-			forget2PC(ref);
+			forget2PC(participantRef);
 			return;
 
 		case AT2PCStatus.NONE:
 		}
 	}
 
-	public void readOnly(String ref) throws AxisFault {
+	public void readOnly(String participantRef) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			forget2PC(ref);
+			forget2PC(participantRef);
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			throw INVALID_STATE_SOAP_FAULT;
+			throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			return;
 
 		case AT2PCStatus.ABORTING:
-			forget2PC(ref);
+			forget2PC(participantRef);
 			return;
 
 		case AT2PCStatus.NONE:
 		}
 	}
 
-	public void replay(String ref) throws AxisFault {
+	private ParticipantStub getParticipantStub(String participantRef,
+			EndpointReference epr) throws AxisFault, MalformedURLException {
+		return new ParticipantStub(this, participantRef, epr);
+	}
+
+	public void replay(String participantRef) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
@@ -148,111 +168,131 @@
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			EndpointReference epr = getEprOf2PCParticipant(ref);
+			EndpointReference epr = getEprToRespond(participantRef);
 			if (epr != null)
 				try {
-					new ParticipantStub(epr).commitOperation(null);
+					getParticipantStub(participantRef, epr).commitOperation(
+						null);
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
 			return;
 
 		case AT2PCStatus.ABORTING:
-			epr = getEprOf2PCParticipant(ref);
+			epr = getEprToRespond(participantRef);
 			if (epr != null)
 				try {
-					new ParticipantStub(epr).rollbackOperation(null);
+					getParticipantStub(participantRef, epr).rollbackOperation(
+						null);
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
 			return;
 
 		case AT2PCStatus.NONE:
-			if (volatile2PCParticipants.containsKey(ref))
-				throw INVALID_STATE_SOAP_FAULT;
-			epr = (EndpointReference) durable2PCParticipants.get(ref);
-			if (epr != null)
-				try {
-					new ParticipantStub(epr).rollbackOperation(null);
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
+			if (volatile2PCParticipants.containsKey(participantRef))
+				throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			else {
+				epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+				if (epr == null)
+					epr = getReplyToEpr();
+				if (epr != null)
+					try {
+						getParticipantStub(participantRef, epr).rollbackOperation(
+							null);
+					} catch (Exception e) {
+						e.printStackTrace();
+					}
+			}
 		}
 	}
 
-	public void prepared(String ref) throws AxisFault {
+	public void prepared(String participantRef) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
-			rollback();
-			throw INVALID_STATE_SOAP_FAULT;
+			try {
+				throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			} finally {
+				rollback();
+			}
+			return;
 
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			preparedParticipants.add(ref);
+			preparedParticipants.add(participantRef);
 			return;
 
 		case AT2PCStatus.COMMITTING:
-			EndpointReference epr = getEprOf2PCParticipant(ref);
+			EndpointReference epr = getEprToRespond(participantRef);
 			if (epr != null)
 				try {
-					new ParticipantStub(epr).commitOperation(null);
+					getParticipantStub(participantRef, epr).commitOperation(
+						null);
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
 			return;
 
 		case AT2PCStatus.ABORTING:
-			if (volatile2PCParticipants.remove(ref) != null) 
-				throw INVALID_STATE_SOAP_FAULT;
-			epr = (EndpointReference) durable2PCParticipants.remove(ref);
-			if (epr != null) {
-				try {
-					new ParticipantStub(epr).rollbackOperation(null);
-				} catch (Exception e) {
-					e.printStackTrace();
+			if (volatile2PCParticipants.remove(participantRef) != null)
+				throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			else {
+				epr = (EndpointReference) durable2PCParticipants.remove(participantRef);
+				if (epr == null)
+					epr = getReplyToEpr();
+				if (epr != null) {
+					try {
+						getParticipantStub(participantRef, epr).rollbackOperation(
+							null);
+					} catch (Exception e) {
+						e.printStackTrace();
+					}
 				}
 			}
 			return;
 
 		case AT2PCStatus.NONE:
-			if (volatile2PCParticipants.containsKey(ref))
-				throw INVALID_STATE_SOAP_FAULT;
-			epr = (EndpointReference) durable2PCParticipants.get(ref);
-			if (epr != null)
-				try {
-					new ParticipantStub(epr).rollbackOperation(null);
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
+			if (volatile2PCParticipants.containsKey(participantRef))
+				throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			else {
+				epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+				if (epr == null)
+					epr = getReplyToEpr();
+				if (epr != null)
+					try {
+						getParticipantStub(participantRef, epr).rollbackOperation(
+							null);
+					} catch (Exception e) {
+						e.printStackTrace();
+					}
+			}
 		}
 	}
 
-	public void committed(String ref) throws AxisFault {
+	public void committed(String participantRef) throws AxisFault {
 		switch (status) {
 		case AT2PCStatus.ACTIVE:
 		case AT2PCStatus.PREPARING_VOLATILE:
 		case AT2PCStatus.PREPARING_DURABLE:
-			rollback();
-			throw INVALID_STATE_SOAP_FAULT;
+			try {
+				throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			} finally {
+				rollback();
+			}
+			return;
 
 		case AT2PCStatus.COMMITTING:
-			forget2PC(ref);
+			forget2PC(participantRef);
 			return;
 
 		case AT2PCStatus.ABORTING:
-			throw INVALID_STATE_SOAP_FAULT;
+			throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+			return;
 
 		case AT2PCStatus.NONE:
 		}
 	}
 
-	private EndpointReference getEprOf2PCParticipant(String ref) {
-		EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(ref);
-		if (epr != null)
-			return epr;
-		return (EndpointReference) durable2PCParticipants.get(ref);
-	}
-
 	private boolean prepare(Map participants) {
 		int iters = 0;
 		int status_old = status;
@@ -261,12 +301,15 @@
 			if (iters++ > 0)
 				pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
 
-			Iterator iter = participants.values().iterator();
+			Iterator iter = participants.keySet().iterator();
 			while (iter.hasNext()) {
 				if (status == AT2PCStatus.ABORTING)
 					return false;
 				try {
-					new ParticipantStub((EndpointReference) iter.next()).prepareOperation(null);
+					String participantRef = (String) iter.next();
+					getParticipantStub(participantRef,
+						(EndpointReference) participants.get(participantRef)).prepareOperation(
+						null);
 				} catch (Exception e) {
 					e.printStackTrace();
 				}
@@ -334,11 +377,13 @@
 
 			Map participants = volatile2PCParticipants;
 			while (true) {
-				Iterator iter = participants.values().iterator();
+				Iterator iter = participants.keySet().iterator();
 				while (iter.hasNext())
 					try {
-						ParticipantStub p = new ParticipantStub(
-								(EndpointReference) iter.next());
+						String participantRef = (String) iter.next();
+						ParticipantStub p = getParticipantStub(
+							participantRef,
+							(EndpointReference) participants.get(participantRef));
 						if (status == AT2PCStatus.ABORTING)
 							p.rollbackOperation(null);
 						else
@@ -399,9 +444,21 @@
 		replay(getParticipantRef());
 	}
 
-	private AddressingHeaders getAddressingHeaders() {
-		return (AddressingHeaders) MessageContext.getCurrentContext().getProperty(
-			Constants.ENV_ADDRESSING_REQUEST_HEADERS);
+	private EndpointReference getEprToSendFault(String participantRef) {
+		EndpointReference epr = getFaultToEpr();
+		if (epr != null)
+			return epr;
+		return getEprToRespond(participantRef);
+	}
+
+	private EndpointReference getEprToRespond(String participantRef) {
+		EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(participantRef);
+		if (epr != null)
+			return epr;
+		epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+		if (epr != null)
+			return epr;
+		return getReplyToEpr();
 	}
 
 	private String getParticipantRef() {
@@ -423,10 +480,31 @@
 	public synchronized void timeout() throws TimedOutException {
 		System.out.println("[ATCoordinatorImpl] timeout "
 				+ AT2PCStatus.getStatusName(status));
+
 		if (status != AT2PCStatus.NONE) {
 			maxRetries = 3;
 			rollback();
 			throw new TimedOutException();
 		}
 	}
+
+	private void throwFault(String participantRef, AxisFault fault)
+			throws AxisFault {
+		throwFault(getEprToSendFault(participantRef), fault);
+	}
+
+	/*
+	 * (non-Javadoc)
+	 * 
+	 * @see org.apache.ws.transaction.coordinator.Callback#onFault(javax.xml.soap.Name)
+	 */
+	public synchronized void onFault(Name code) {
+		// TODO Auto-generated method stub
+
+	}
+
+	public EndpointReference getEndpointReference() {
+		throw new UnsupportedOperationException();
+	}
+
 }

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java Fri Jan  6 21:33:39 2006
@@ -20,6 +20,10 @@
 import java.net.URL;
 
 import org.apache.axis.AxisFault;
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.Callback;
+import org.apache.ws.transaction.coordinator.CoordinationService;
+import org.apache.ws.transaction.utility.AddressingHeaders;
 import org.apache.ws.transaction.utility.Service;
 import org.apache.ws.transaction.utility.TCPSnifferHelper;
 import org.apache.ws.transaction.wsat.CompletionCoordinatorBindingStub;
@@ -31,10 +35,17 @@
  */
 public class CompletionCoordinatorStub extends CompletionCoordinatorBindingStub {
 
-	public CompletionCoordinatorStub(
-			org.apache.axis.message.addressing.EndpointReference epr)
+	public CompletionCoordinatorStub(Callback callback, EndpointReference epr)
 			throws AxisFault, MalformedURLException {
 		super(new URL(TCPSnifferHelper.redirect(epr.getAddress().toString())),
-				new Service(epr));
+				new Service());
+
+		CoordinationService cs = CoordinationService.getInstance();
+
+		AddressingHeaders headers = new AddressingHeaders(epr,
+				callback.getEndpointReference());
+		headers.setFaultTo(cs.getFaultDispatcherService(callback));
+		((Service) service).setAddressingHeaders(headers);
+
 	}
 }

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java Fri Jan  6 21:33:39 2006
@@ -6,7 +6,7 @@
 
 import java.rmi.RemoteException;
 
-import org.apache.ws.transaction.utility.CallbackRegistry;
+import org.apache.ws.transaction.coordinator.CallbackRegistry;
 import org.apache.ws.transaction.wsat.CoordinatorPortType;
 import org.apache.ws.transaction.wsat.Notification;
 
@@ -18,31 +18,31 @@
 
 	public void preparedOperation(Notification params) throws RemoteException {
 		ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
-			CallbackRegistry.COORDINATOR_REF, false);
+			CallbackRegistry.CALLBACK_REF, false);
 		c.preparedOperation(params);
 	}
 
 	public void abortedOperation(Notification params) throws RemoteException {
 		ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
-			CallbackRegistry.COORDINATOR_REF, false);
+			CallbackRegistry.CALLBACK_REF, false);
 		c.abortedOperation(params);
 	}
 
 	public void readOnlyOperation(Notification params) throws RemoteException {
 		ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
-			CallbackRegistry.COORDINATOR_REF, false);
+			CallbackRegistry.CALLBACK_REF, false);
 		c.readOnlyOperation(params);
 	}
 
 	public void committedOperation(Notification params) throws RemoteException {
 		ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
-			CallbackRegistry.COORDINATOR_REF, false);
+			CallbackRegistry.CALLBACK_REF, false);
 		c.committedOperation(params);
 	}
 
 	public void replayOperation(Notification params) throws RemoteException {
 		ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
-			CallbackRegistry.COORDINATOR_REF, false);
+			CallbackRegistry.CALLBACK_REF, false);
 		c.replayOperation(params);
 	}
 }

Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java Fri Jan  6 21:33:39 2006
@@ -37,7 +37,7 @@
 	}
 
 	public void begin() throws RemoteException {
-		begin(CoordinationService.getInstance().getActivationService());
+		begin(CoordinationService.getInstance().getActivationCoordinatorService());
 	}
 
 	public void begin(EndpointReference epr) throws RemoteException {



---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org