You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/01/02 20:08:55 UTC

svn commit: r365396 - in /webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at: AbstractParticipant.java BasicParticipant.java TransactionImpl.java

Author: dasarath
Date: Mon Jan  2 11:08:53 2006
New Revision: 365396

URL: http://svn.apache.org/viewcvs?rev=365396&view=rev
Log: (empty)

Added:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java
Removed:
    webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/BasicParticipant.java

Added: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java?rev=365396&view=auto
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java (added)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java Mon Jan  2 11:08:53 2006
@@ -0,0 +1,208 @@
+/*
+ * Created on Dec 30, 2005
+ *
+ */
+package org.apache.ws.transaction.coordinator.at;
+
+import java.rmi.RemoteException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.CoordinationContext;
+import org.apache.ws.transaction.coordinator.Coordinator;
+import org.apache.ws.transaction.coordinator.ParticipantService;
+import org.apache.ws.transaction.coordinator.TimedOutException;
+import org.apache.ws.transaction.utility.Callback;
+import org.apache.ws.transaction.wsat.CoordinatorPortType;
+import org.apache.ws.transaction.wsat.Notification;
+import org.apache.ws.transaction.wsat.ParticipantPortType;
+import org.apache.ws.transaction.wscoor.Expires;
+
+/**
+ * @author Dasarath Weeratunge
+ *  
+ */
+public abstract class AbstractParticipant implements ParticipantPortType,
+		Callback {
+
+	private static Timer timer = new Timer();
+
+	public static final int RETRY_DELAY_MILLIS = 10 * 1000;
+
+	private EndpointReference eprOfCoordinator;
+
+	protected abstract int prepare() throws XAException;
+
+	protected abstract void commit() throws XAException;
+
+	protected abstract void rollback() throws XAException;
+
+	protected abstract void forget();
+
+	protected abstract int getStatus();
+
+	protected void register(boolean durable, CoordinationContext ctx)
+			throws RemoteException {
+		long timeout = 0;
+		Expires ex = ctx.getExpires();
+		if (ex != null)
+			timeout = ex.get_value().longValue();
+		EndpointReference epr = ParticipantService.getInstance().getParticipantService(
+			this, timeout);
+		eprOfCoordinator = ctx.register(
+			durable ? ATCoordinator.PROTOCOL_ID_DURABLE_2PC
+					: ATCoordinator.PROTOCOL_ID_VOLATILE_2PC, epr);
+	}
+
+	protected CoordinatorPortType getCoordinator() {
+		try {
+			return new CoordinatorStub(eprOfCoordinator);
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new RuntimeException(e);
+		}
+	}
+
+	public synchronized void prepareOperation(Notification parameters)
+			throws RemoteException {
+		switch (getStatus()) {
+		case AT2PCStatus.NONE:
+			getCoordinator().abortedOperation(null);
+			return;
+
+		case AT2PCStatus.ACTIVE:
+			final CoordinatorPortType p = getCoordinator();
+			try {
+				if (prepare() == XAResource.XA_RDONLY) {
+					forget();
+					p.readOnlyOperation(null);
+				} else {
+					p.preparedOperation(null);
+					timer.schedule(new TimerTask() {
+						public void run() {
+							switch (getStatus()) {
+							case AT2PCStatus.NONE:
+							case AT2PCStatus.ACTIVE:
+							case AT2PCStatus.PREPARING:
+							case AT2PCStatus.ABORTING:
+							case AT2PCStatus.COMMITTING:
+								cancel();
+								return;
+
+							case AT2PCStatus.PREPARED:
+								try {
+									p.preparedOperation(null);
+								} catch (RemoteException e) {
+									// TODO:
+									// identify wscoor:InvalidState Soap fault and stop
+									e.printStackTrace();								
+								}
+							}
+						}
+					}, RETRY_DELAY_MILLIS, RETRY_DELAY_MILLIS);
+				}
+			} catch (XAException e) {
+				forget();
+				p.abortedOperation(null);
+			}
+			return;
+
+		case AT2PCStatus.PREPARING:
+			return;
+
+		case AT2PCStatus.PREPARED:
+			getCoordinator().preparedOperation(null);
+			return;
+
+		case AT2PCStatus.ABORTING:
+			forget();
+			getCoordinator().abortedOperation(null);
+			return;
+
+		case AT2PCStatus.COMMITTING:
+		}
+	}
+
+	public synchronized void commitOperation(Notification parameters)
+			throws RemoteException {
+		switch (getStatus()) {
+		case AT2PCStatus.NONE:
+			getCoordinator().committedOperation(null);
+			return;
+
+		case AT2PCStatus.ACTIVE:
+		case AT2PCStatus.PREPARING:
+			try {
+				rollback();
+				forget();
+				getCoordinator().abortedOperation(null);
+			} catch (XAException e) {
+				e.printStackTrace();
+			}
+			return;
+
+		case AT2PCStatus.PREPARED:
+			try {
+				commit();
+				forget();
+				getCoordinator().committedOperation(null);
+			} catch (XAException e) {
+				e.printStackTrace();
+			}
+			return;
+
+		case AT2PCStatus.ABORTING:
+			throw Coordinator.INVALID_STATE_SOAP_FAULT;
+
+		case AT2PCStatus.COMMITTING:
+		}
+	}
+
+	public synchronized void rollbackOperation(Notification parameters)
+			throws RemoteException {
+		switch (getStatus()) {
+		case AT2PCStatus.NONE:
+			getCoordinator().abortedOperation(null);
+			return;
+
+		case AT2PCStatus.ACTIVE:
+		case AT2PCStatus.PREPARING:
+		case AT2PCStatus.PREPARED:
+			try {
+				rollback();
+				forget();
+				getCoordinator().abortedOperation(null);
+			} catch (XAException e) {
+				e.printStackTrace();
+			}
+			return;
+
+		case AT2PCStatus.ABORTING:
+			forget();
+			getCoordinator().abortedOperation(null);
+			return;
+
+		case AT2PCStatus.COMMITTING:
+			throw Coordinator.INVALID_STATE_SOAP_FAULT;
+		}
+	}
+
+	public void timeout() throws TimedOutException {
+		System.out.println("[AbstractParticipant] timeout "
+				+ AT2PCStatus.getStatusName(getStatus()));
+		if (getStatus() == AT2PCStatus.NONE)
+			return;
+		try {
+			rollback();
+			forget();
+			getCoordinator().abortedOperation(null);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		throw new TimedOutException();
+	}
+}
\ No newline at end of file

Added: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java?rev=365396&view=auto
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java (added)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java Mon Jan  2 11:08:53 2006
@@ -0,0 +1,162 @@
+/*
+ * Copyright  2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.ws.transaction.coordinator.at;
+
+import java.rmi.RemoteException;
+
+import javax.transaction.RollbackException;
+
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.CoordinationContext;
+import org.apache.ws.transaction.coordinator.ParticipantService;
+import org.apache.ws.transaction.coordinator.TimedOutException;
+import org.apache.ws.transaction.utility.Callback;
+import org.apache.ws.transaction.wsat.CompletionInitiatorPortType;
+import org.apache.ws.transaction.wsat.Notification;
+import org.apache.ws.transaction.wscoor.Expires;
+
+/**
+ * @author Dasarath Weeratunge
+ */
+public class TransactionImpl {
+
+	private CoordinationContext ctx;
+
+	private EndpointReference eprOfCompletionCoordinator;
+
+	private CompletionInitiatorCallback callback;
+
+	private boolean aborted = false;
+
+	private boolean committed = false;
+
+	private boolean timedOut = false;
+
+	private boolean canInitiateCompletion = false;
+
+	protected TransactionImpl(CoordinationContext ctx) {
+		this.ctx = ctx;
+	}
+
+	private void register() throws RemoteException {
+		long timeout = 0;
+		Expires ex = ctx.getExpires();
+		if (ex != null)
+			timeout = ex.get_value().longValue();
+		callback = new CompletionInitiatorCallback();
+		eprOfCompletionCoordinator = ctx.register(
+			ATCoordinator.PROTOCOL_ID_COMPLETION,
+			ParticipantService.getInstance().getCompletionInitiatorService(
+				callback, timeout));
+		canInitiateCompletion = true;
+	}
+
+	private class CompletionInitiatorCallback implements
+			CompletionInitiatorPortType, Callback {
+		public synchronized void committedOperation(Notification parameters)
+				throws RemoteException {
+			committed = true;
+			notify();
+		}
+
+		public synchronized void abortedOperation(Notification parameters)
+				throws RemoteException {
+			aborted = true;
+			notify();
+		}
+
+		public synchronized void timeout() {
+			timedOut = true;
+			notify();
+		}
+	}
+
+	protected CoordinationContext getCoordinationContex() {
+		return ctx;
+	}
+
+	public void enlistParticipant(boolean durable,
+			AbstractParticipant participant) throws RemoteException {
+
+		TransactionManagerImpl tm = TransactionManagerImpl.getInstance();
+		TransactionImpl tx = tm.suspend();
+
+		participant.register(durable, ctx);
+
+		tm.resume(tx);
+	}
+
+	public void rollback() throws RemoteException {
+		TransactionManagerImpl tm = TransactionManagerImpl.getInstance();
+		TransactionImpl tx = tm.suspend();
+
+		if (!canInitiateCompletion)
+			register();
+
+		try {
+			synchronized (callback) {
+				if (!aborted) {
+					if (committed)
+						throw new IllegalStateException();
+					new CompletionCoordinatorStub(eprOfCompletionCoordinator).rollbackOperation(null);
+					callback.wait();
+				}
+			}
+			if (timedOut)
+				throw new TimedOutException();
+			if (!aborted)
+				throw new RollbackException();
+		} catch (RemoteException e) {			
+			throw e;
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new RuntimeException(e);
+		} finally {
+			tm.resume(tx);
+		}
+	}
+
+	public void commit() throws RemoteException {
+		TransactionManagerImpl tm = TransactionManagerImpl.getInstance();
+		TransactionImpl tx = tm.suspend();
+
+		if (!canInitiateCompletion)
+			register();
+
+		try {
+			synchronized (callback) {
+				if (!committed) {
+					if (aborted)
+						throw new IllegalStateException();
+					new CompletionCoordinatorStub(eprOfCompletionCoordinator).commitOperation(null);
+					callback.wait();
+				}
+			}
+			if (timedOut)
+				throw new TimedOutException();
+			if (!committed)
+				throw new RollbackException();
+		} catch (RemoteException e) {
+			throw e;
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new RuntimeException(e);
+		} finally {
+			tm.resume(tx);
+		}
+	}
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org