You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by da...@apache.org on 2005/12/24 03:06:27 UTC

svn commit: r358904 - /webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AtMsgCoordinatorImpl.java

Author: dasarath
Date: Fri Dec 23 18:06:26 2005
New Revision: 358904

URL: http://svn.apache.org/viewcvs?rev=358904&view=rev
Log: (empty)

Added:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AtMsgCoordinatorImpl.java

Added: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AtMsgCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AtMsgCoordinatorImpl.java?rev=358904&view=auto
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AtMsgCoordinatorImpl.java (added)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AtMsgCoordinatorImpl.java Fri Dec 23 18:06:26 2005
@@ -0,0 +1,415 @@
+/*
+ * Created on Dec 23, 2005
+ *
+ */
+package org.apache.ws.transaction.coordinator.at;
+
+import java.rmi.RemoteException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.axis.components.uuid.UUIDGen;
+import org.apache.axis.components.uuid.UUIDGenFactory;
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.CoordinationService;
+import org.apache.ws.transaction.coordinator.InvalidCoordinationProtocolException;
+import org.apache.ws.transaction.coordinator.MsgCoordinatorImpl;
+import org.apache.ws.transaction.wsat.CompletionInitiatorPort;
+import org.apache.ws.transaction.wsat.ParticipantPort;
+import org.apache.ws.transaction.wscoor.RegistrationRequesterPort;
+
+/**
+ * @author Dasarath Weeratunge
+ *  
+ */
+public class AtMsgCoordinatorImpl extends MsgCoordinatorImpl implements
+		AtMsgCoordinator {
+
+	int status = At2PCStatus.NONE;
+
+	private static final int COMPLETION = 0;
+
+	private static final int VOLATILE_2PC = 1;
+
+	private static final int DURABLE_2PC = 2;
+
+	Map participants[] = new Map[3];
+
+	Map prepared = Collections.synchronizedMap(new HashMap());
+
+	public static final int MAX_ITERS = 3;
+
+	public static final int ITER_DELAY = 15 * 1000;
+
+	public static final int RESPONSE_DELAY = 5 * 1000;
+
+	protected AtMsgCoordinatorImpl(CoordinationService cs) {
+		super(cs);
+		status = At2PCStatus.ACTIVE;
+		for (int i = 0; i < 3; i++)
+			participants[i] = Collections.synchronizedMap(new HashMap());
+	}
+
+	public String getCoordinationType() {
+		return COORDINATION_TYPE;
+	}
+
+	public void register(String prot, EndpointReference pps,
+			EndpointReference res) throws InvalidCoordinationProtocolException {
+		UUIDGen gen = UUIDGenFactory.getUUIDGen();
+		CoordinationService cs = getCoordinationService();
+		String ref = null;
+		EndpointReference epr = null;
+		if (prot.equals(PROTOCOL_ID_COMPLETION))
+			if (status == At2PCStatus.ACTIVE) {
+				ref = gen.nextUUID();
+				participants[COMPLETION].put(ref, pps);
+				epr = cs.getCompletionCoordinatorService();
+			} else
+				return;
+		else {
+			if (prot.equals(PROTOCOL_ID_VOLATILE_2PC))
+				if (status == At2PCStatus.ACTIVE
+						|| status == At2PCStatus.PREPARING_VOLATILE) {
+					ref = gen.nextUUID();
+					participants[VOLATILE_2PC].put(ref, pps);
+				} else
+					return;
+			else if (prot.equals(PROTOCOL_ID_DURABLE_2PC))
+				if (status == At2PCStatus.ACTIVE) {
+					ref = gen.nextUUID();
+					participants[DURABLE_2PC].put(ref, pps);
+				} else
+					return;
+			else
+				throw new InvalidCoordinationProtocolException();
+			epr = cs.getCoordinatorService();
+			epr.setProperties(cs.getReferenceProperties(this, ref));
+		}
+
+		try {
+			new RegistrationRequesterPort(res).registerResponseOperation(epr);
+		} catch (RemoteException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	public void forget2PC(String ref) {
+		if (participants[VOLATILE_2PC].get(ref) != null)
+			participants[VOLATILE_2PC].remove(ref);
+		else
+			participants[DURABLE_2PC].remove(ref);
+	}
+
+	public void rollback() {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+			status = At2PCStatus.ABORTING;
+			break;
+
+		case At2PCStatus.COMMITTING:
+		case At2PCStatus.ABORTING:
+		case At2PCStatus.ENDED:
+			return;
+
+		}
+
+		terminate();
+	}
+
+	public void aborted(String ref) {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+			forget2PC(ref);
+			rollback();
+			return;
+
+		case At2PCStatus.COMMITTING:
+			throw new IllegalStateException();
+
+		case At2PCStatus.ABORTING:
+			forget2PC(ref);
+			return;
+
+		case At2PCStatus.ENDED:
+			return;
+
+		}
+
+	}
+
+	public void readOnly(String ref) {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+			forget2PC(ref);
+			return;
+
+		case At2PCStatus.COMMITTING:
+			throw new IllegalStateException();
+
+		case At2PCStatus.ABORTING:
+			forget2PC(ref);
+			return;
+
+		case At2PCStatus.ENDED:
+			return;
+
+		}
+	}
+
+	public void replay(String ref) {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+			rollback();
+			return;
+
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+			status = At2PCStatus.ABORTING;
+			return;
+
+		case At2PCStatus.COMMITTING:
+			EndpointReference epr = get2PCEndpointReference(ref);
+			try {
+				new ParticipantPort(epr).commitOperation();
+			} catch (RemoteException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			return;
+
+		case At2PCStatus.ABORTING:
+			epr = get2PCEndpointReference(ref);
+			try {
+				new ParticipantPort(epr).rollbackOperation();
+			} catch (RemoteException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			forget2PC(ref);
+			return;
+
+		case At2PCStatus.ENDED:
+			return;
+
+		}
+	}
+
+	public void prepared(String ref) {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+			rollback();
+			throw new IllegalStateException();
+
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+			prepared.put(ref, ref);
+			return;
+
+		case At2PCStatus.COMMITTING:
+			EndpointReference epr = get2PCEndpointReference(ref);
+			try {
+				new ParticipantPort(epr).commitOperation();
+			} catch (RemoteException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			return;
+
+		case At2PCStatus.ABORTING:
+			epr = get2PCEndpointReference(ref);
+			try {
+				new ParticipantPort(epr).rollbackOperation();
+			} catch (RemoteException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			forget2PC(ref);
+			return;
+
+		case At2PCStatus.ENDED:
+			return;
+
+		}
+	}
+
+	public void committed(String ref) {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+			rollback();
+			throw new IllegalStateException();
+
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+			status = At2PCStatus.ABORTING;
+			throw new IllegalStateException();
+
+		case At2PCStatus.COMMITTING:
+			forget2PC(ref);
+			return;
+
+		case At2PCStatus.ABORTING:
+			throw new IllegalStateException();
+
+		case At2PCStatus.ENDED:
+			return;
+
+		}
+
+	}
+
+	private EndpointReference get2PCEndpointReference(String ref) {
+		EndpointReference epr = (EndpointReference) participants[VOLATILE_2PC]
+				.get(ref);
+		if (epr == null)
+			epr = (EndpointReference) participants[DURABLE_2PC].get(ref);
+		return epr;
+	}
+
+	private boolean prepare(int prot) {
+		int iters = 0;
+		while (iters < MAX_ITERS) {
+			if (iters > 0)
+				pause(ITER_DELAY - RESPONSE_DELAY);
+
+			synchronized (participants[prot]) {
+				Iterator iter = participants[prot].values().iterator();
+				while (iter.hasNext()) {
+					if (status == At2PCStatus.ABORTING)
+						return false;
+					try {
+						new ParticipantPort((EndpointReference) iter.next())
+								.prepareOperation();
+					} catch (RemoteException e1) {
+						// TODO Auto-generated catch block
+						e1.printStackTrace();
+					}
+				}
+			}
+
+			pause(RESPONSE_DELAY);
+
+			boolean allPrepared = true;
+			synchronized (participants[prot]) {
+				Iterator iter = participants[prot].keySet().iterator();
+				while (iter.hasNext())
+					if (!prepared.containsKey(iter.next())) {
+						allPrepared = false;
+						break;
+					}
+			}
+
+			if (allPrepared)
+				break;
+
+			iters++;
+		}
+
+		return iters < MAX_ITERS;
+	}
+
+	private boolean prepare() {
+		prepared.clear();
+		status = At2PCStatus.PREPARING_VOLATILE;
+		if (!prepare(VOLATILE_2PC))
+			return false;
+
+		prepared.clear();
+		status = At2PCStatus.PREPARING_DURABLE;
+		return prepare(DURABLE_2PC);
+	}
+
+	public void commit() {
+		switch (status) {
+		case At2PCStatus.ACTIVE:
+			break;
+
+		case At2PCStatus.PREPARING_VOLATILE:
+		case At2PCStatus.PREPARING_DURABLE:
+		case At2PCStatus.COMMITTING:
+		case At2PCStatus.ABORTING:
+		case At2PCStatus.ENDED:
+			return;
+		}
+
+		if (!prepare()) {
+			rollback();
+			return;
+		}
+
+		status = At2PCStatus.COMMITTING;
+		terminate();
+	}
+
+	private void pause(long millis) {
+		try {
+			Thread.sleep(millis);
+		} catch (InterruptedException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+
+	private void terminate() {
+		int iters = 0;
+		while (iters < MAX_ITERS
+				&& !(participants[VOLATILE_2PC].isEmpty() && participants[DURABLE_2PC]
+						.isEmpty())) {
+			if (iters > 0)
+				pause(ITER_DELAY - RESPONSE_DELAY);
+			iters++;
+
+			for (int prot = VOLATILE_2PC; prot == VOLATILE_2PC
+					|| prot == DURABLE_2PC; prot = prot == VOLATILE_2PC ? DURABLE_2PC
+					: -1) {
+				synchronized (participants[prot]) {
+					Iterator iter = participants[prot].values().iterator();
+					while (iter.hasNext())
+						try {
+							ParticipantPort p = new ParticipantPort(
+									(EndpointReference) iter.next());
+							if (status == At2PCStatus.ABORTING)
+								p.rollbackOperation();
+							else
+								p.commitOperation();
+						} catch (RemoteException e1) {
+							// TODO Auto-generated catch block
+							e1.printStackTrace();
+						}
+				}
+			}
+
+			pause(RESPONSE_DELAY);
+		}
+
+		if (participants[VOLATILE_2PC].isEmpty()
+				&& participants[DURABLE_2PC].isEmpty()) {
+			synchronized (participants[COMPLETION]) {
+				Iterator iter = participants[COMPLETION].values().iterator();
+				while (iter.hasNext())
+					try {
+						CompletionInitiatorPort p = new CompletionInitiatorPort(
+								(EndpointReference) iter.next());
+						if (status == At2PCStatus.ABORTING)
+							p.abortedOperation();
+						else
+							p.committedOperation();
+					} catch (RemoteException e1) {
+						// TODO Auto-generated catch block
+						e1.printStackTrace();
+					}
+			}
+		}
+
+		status = At2PCStatus.ENDED;
+	}
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org