You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/01/02 20:08:55 UTC
svn commit: r365396 - in
/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at:
AbstractParticipant.java BasicParticipant.java TransactionImpl.java
Author: dasarath
Date: Mon Jan 2 11:08:53 2006
New Revision: 365396
URL: http://svn.apache.org/viewcvs?rev=365396&view=rev
Log: (empty)
Added:
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java
Removed:
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/BasicParticipant.java
Added: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java?rev=365396&view=auto
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java (added)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/AbstractParticipant.java Mon Jan 2 11:08:53 2006
@@ -0,0 +1,208 @@
+/*
+ * Created on Dec 30, 2005
+ *
+ */
+package org.apache.ws.transaction.coordinator.at;
+
+import java.rmi.RemoteException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.CoordinationContext;
+import org.apache.ws.transaction.coordinator.Coordinator;
+import org.apache.ws.transaction.coordinator.ParticipantService;
+import org.apache.ws.transaction.coordinator.TimedOutException;
+import org.apache.ws.transaction.utility.Callback;
+import org.apache.ws.transaction.wsat.CoordinatorPortType;
+import org.apache.ws.transaction.wsat.Notification;
+import org.apache.ws.transaction.wsat.ParticipantPortType;
+import org.apache.ws.transaction.wscoor.Expires;
+
+/**
+ * @author Dasarath Weeratunge
+ *
+ */
+public abstract class AbstractParticipant implements ParticipantPortType,
+ Callback {
+
+ private static Timer timer = new Timer();
+
+ public static final int RETRY_DELAY_MILLIS = 10 * 1000;
+
+ private EndpointReference eprOfCoordinator;
+
+ protected abstract int prepare() throws XAException;
+
+ protected abstract void commit() throws XAException;
+
+ protected abstract void rollback() throws XAException;
+
+ protected abstract void forget();
+
+ protected abstract int getStatus();
+
+ protected void register(boolean durable, CoordinationContext ctx)
+ throws RemoteException {
+ long timeout = 0;
+ Expires ex = ctx.getExpires();
+ if (ex != null)
+ timeout = ex.get_value().longValue();
+ EndpointReference epr = ParticipantService.getInstance().getParticipantService(
+ this, timeout);
+ eprOfCoordinator = ctx.register(
+ durable ? ATCoordinator.PROTOCOL_ID_DURABLE_2PC
+ : ATCoordinator.PROTOCOL_ID_VOLATILE_2PC, epr);
+ }
+
+ protected CoordinatorPortType getCoordinator() {
+ try {
+ return new CoordinatorStub(eprOfCoordinator);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public synchronized void prepareOperation(Notification parameters)
+ throws RemoteException {
+ switch (getStatus()) {
+ case AT2PCStatus.NONE:
+ getCoordinator().abortedOperation(null);
+ return;
+
+ case AT2PCStatus.ACTIVE:
+ final CoordinatorPortType p = getCoordinator();
+ try {
+ if (prepare() == XAResource.XA_RDONLY) {
+ forget();
+ p.readOnlyOperation(null);
+ } else {
+ p.preparedOperation(null);
+ timer.schedule(new TimerTask() {
+ public void run() {
+ switch (getStatus()) {
+ case AT2PCStatus.NONE:
+ case AT2PCStatus.ACTIVE:
+ case AT2PCStatus.PREPARING:
+ case AT2PCStatus.ABORTING:
+ case AT2PCStatus.COMMITTING:
+ cancel();
+ return;
+
+ case AT2PCStatus.PREPARED:
+ try {
+ p.preparedOperation(null);
+ } catch (RemoteException e) {
+ // TODO:
+ // identify wscoor:InvalidState Soap fault and stop
+ e.printStackTrace();
+ }
+ }
+ }
+ }, RETRY_DELAY_MILLIS, RETRY_DELAY_MILLIS);
+ }
+ } catch (XAException e) {
+ forget();
+ p.abortedOperation(null);
+ }
+ return;
+
+ case AT2PCStatus.PREPARING:
+ return;
+
+ case AT2PCStatus.PREPARED:
+ getCoordinator().preparedOperation(null);
+ return;
+
+ case AT2PCStatus.ABORTING:
+ forget();
+ getCoordinator().abortedOperation(null);
+ return;
+
+ case AT2PCStatus.COMMITTING:
+ }
+ }
+
+ public synchronized void commitOperation(Notification parameters)
+ throws RemoteException {
+ switch (getStatus()) {
+ case AT2PCStatus.NONE:
+ getCoordinator().committedOperation(null);
+ return;
+
+ case AT2PCStatus.ACTIVE:
+ case AT2PCStatus.PREPARING:
+ try {
+ rollback();
+ forget();
+ getCoordinator().abortedOperation(null);
+ } catch (XAException e) {
+ e.printStackTrace();
+ }
+ return;
+
+ case AT2PCStatus.PREPARED:
+ try {
+ commit();
+ forget();
+ getCoordinator().committedOperation(null);
+ } catch (XAException e) {
+ e.printStackTrace();
+ }
+ return;
+
+ case AT2PCStatus.ABORTING:
+ throw Coordinator.INVALID_STATE_SOAP_FAULT;
+
+ case AT2PCStatus.COMMITTING:
+ }
+ }
+
+ public synchronized void rollbackOperation(Notification parameters)
+ throws RemoteException {
+ switch (getStatus()) {
+ case AT2PCStatus.NONE:
+ getCoordinator().abortedOperation(null);
+ return;
+
+ case AT2PCStatus.ACTIVE:
+ case AT2PCStatus.PREPARING:
+ case AT2PCStatus.PREPARED:
+ try {
+ rollback();
+ forget();
+ getCoordinator().abortedOperation(null);
+ } catch (XAException e) {
+ e.printStackTrace();
+ }
+ return;
+
+ case AT2PCStatus.ABORTING:
+ forget();
+ getCoordinator().abortedOperation(null);
+ return;
+
+ case AT2PCStatus.COMMITTING:
+ throw Coordinator.INVALID_STATE_SOAP_FAULT;
+ }
+ }
+
+ public void timeout() throws TimedOutException {
+ System.out.println("[AbstractParticipant] timeout "
+ + AT2PCStatus.getStatusName(getStatus()));
+ if (getStatus() == AT2PCStatus.NONE)
+ return;
+ try {
+ rollback();
+ forget();
+ getCoordinator().abortedOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ throw new TimedOutException();
+ }
+}
\ No newline at end of file
Added: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java?rev=365396&view=auto
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java (added)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionImpl.java Mon Jan 2 11:08:53 2006
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.ws.transaction.coordinator.at;
+
+import java.rmi.RemoteException;
+
+import javax.transaction.RollbackException;
+
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.CoordinationContext;
+import org.apache.ws.transaction.coordinator.ParticipantService;
+import org.apache.ws.transaction.coordinator.TimedOutException;
+import org.apache.ws.transaction.utility.Callback;
+import org.apache.ws.transaction.wsat.CompletionInitiatorPortType;
+import org.apache.ws.transaction.wsat.Notification;
+import org.apache.ws.transaction.wscoor.Expires;
+
+/**
+ * @author Dasarath Weeratunge
+ */
+public class TransactionImpl {
+
+ private CoordinationContext ctx;
+
+ private EndpointReference eprOfCompletionCoordinator;
+
+ private CompletionInitiatorCallback callback;
+
+ private boolean aborted = false;
+
+ private boolean committed = false;
+
+ private boolean timedOut = false;
+
+ private boolean canInitiateCompletion = false;
+
+ protected TransactionImpl(CoordinationContext ctx) {
+ this.ctx = ctx;
+ }
+
+ private void register() throws RemoteException {
+ long timeout = 0;
+ Expires ex = ctx.getExpires();
+ if (ex != null)
+ timeout = ex.get_value().longValue();
+ callback = new CompletionInitiatorCallback();
+ eprOfCompletionCoordinator = ctx.register(
+ ATCoordinator.PROTOCOL_ID_COMPLETION,
+ ParticipantService.getInstance().getCompletionInitiatorService(
+ callback, timeout));
+ canInitiateCompletion = true;
+ }
+
+ private class CompletionInitiatorCallback implements
+ CompletionInitiatorPortType, Callback {
+ public synchronized void committedOperation(Notification parameters)
+ throws RemoteException {
+ committed = true;
+ notify();
+ }
+
+ public synchronized void abortedOperation(Notification parameters)
+ throws RemoteException {
+ aborted = true;
+ notify();
+ }
+
+ public synchronized void timeout() {
+ timedOut = true;
+ notify();
+ }
+ }
+
+ protected CoordinationContext getCoordinationContex() {
+ return ctx;
+ }
+
+ public void enlistParticipant(boolean durable,
+ AbstractParticipant participant) throws RemoteException {
+
+ TransactionManagerImpl tm = TransactionManagerImpl.getInstance();
+ TransactionImpl tx = tm.suspend();
+
+ participant.register(durable, ctx);
+
+ tm.resume(tx);
+ }
+
+ public void rollback() throws RemoteException {
+ TransactionManagerImpl tm = TransactionManagerImpl.getInstance();
+ TransactionImpl tx = tm.suspend();
+
+ if (!canInitiateCompletion)
+ register();
+
+ try {
+ synchronized (callback) {
+ if (!aborted) {
+ if (committed)
+ throw new IllegalStateException();
+ new CompletionCoordinatorStub(eprOfCompletionCoordinator).rollbackOperation(null);
+ callback.wait();
+ }
+ }
+ if (timedOut)
+ throw new TimedOutException();
+ if (!aborted)
+ throw new RollbackException();
+ } catch (RemoteException e) {
+ throw e;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ tm.resume(tx);
+ }
+ }
+
+ public void commit() throws RemoteException {
+ TransactionManagerImpl tm = TransactionManagerImpl.getInstance();
+ TransactionImpl tx = tm.suspend();
+
+ if (!canInitiateCompletion)
+ register();
+
+ try {
+ synchronized (callback) {
+ if (!committed) {
+ if (aborted)
+ throw new IllegalStateException();
+ new CompletionCoordinatorStub(eprOfCompletionCoordinator).commitOperation(null);
+ callback.wait();
+ }
+ }
+ if (timedOut)
+ throw new TimedOutException();
+ if (!committed)
+ throw new RollbackException();
+ } catch (RemoteException e) {
+ throw e;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ tm.resume(tx);
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org