You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/01/07 06:33:42 UTC
svn commit: r366671 - in
/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at:
ATCoordinatorImpl.java CompletionCoordinatorStub.java CoordinatorImpl.java
TransactionManagerImpl.java
Author: dasarath
Date: Fri Jan 6 21:33:39 2006
New Revision: 366671
URL: http://svn.apache.org/viewcvs?rev=366671&view=rev
Log: (empty)
Modified:
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java
Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java Fri Jan 6 21:33:39 2006
@@ -4,6 +4,7 @@
*/
package org.apache.ws.transaction.coordinator.at;
+import java.net.MalformedURLException;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -13,17 +14,18 @@
import java.util.Map;
import java.util.Set;
+import javax.xml.soap.Name;
+
import org.apache.axis.AxisFault;
-import org.apache.axis.MessageContext;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.MessageElement;
import org.apache.axis.message.addressing.AddressingHeaders;
-import org.apache.axis.message.addressing.Constants;
import org.apache.axis.message.addressing.EndpointReference;
import org.apache.axis.types.URI.MalformedURIException;
import org.apache.ws.transaction.coordinator.CoordinationService;
import org.apache.ws.transaction.coordinator.CoordinatorImpl;
+import org.apache.ws.transaction.coordinator.InvalidCoordinationProtocolException;
import org.apache.ws.transaction.coordinator.TimedOutException;
import org.apache.ws.transaction.wsat.Notification;
@@ -53,36 +55,47 @@
super(COORDINATION_TYPE_ID);
}
- public EndpointReference register(String prot, EndpointReference pps)
- throws AxisFault {
+ public EndpointReference register(String protocol,
+ EndpointReference participantProtocolService)
+ throws InvalidCoordinationProtocolException {
+
if (!(status == AT2PCStatus.ACTIVE || status == AT2PCStatus.PREPARING_VOLATILE))
throw new IllegalStateException();
+
CoordinationService cs = CoordinationService.getInstance();
- String ref = null;
+ String participantRef = null;
EndpointReference epr = null;
- if (prot.equals(PROTOCOL_ID_COMPLETION)) {
- if (pps != null)
- completionParticipants.add(pps);
+
+ if (protocol.equals(PROTOCOL_ID_COMPLETION)) {
+ if (participantProtocolService != null)
+ completionParticipants.add(participantProtocolService);
+
epr = cs.getCompletionCoordinatorService(this);
} else {
- if (pps == null)
+ if (participantProtocolService == null)
throw new IllegalArgumentException();
+
UUIDGen gen = UUIDGenFactory.getUUIDGen();
- ref = "uuid:" + gen.nextUUID();
- if (prot.equals(PROTOCOL_ID_VOLATILE_2PC))
- volatile2PCParticipants.put(ref, pps);
- else if (prot.equals(PROTOCOL_ID_DURABLE_2PC))
- durable2PCParticipants.put(ref, pps);
+ participantRef = "uuid:" + gen.nextUUID();
+
+ if (protocol.equals(PROTOCOL_ID_VOLATILE_2PC))
+ volatile2PCParticipants.put(participantRef,
+ participantProtocolService);
+ else if (protocol.equals(PROTOCOL_ID_DURABLE_2PC))
+ durable2PCParticipants.put(participantRef,
+ participantProtocolService);
else
- throw INVALID_PROTOCOL_SOAP_FAULT;
- epr = cs.getCoordinatorService(this, ref);
+ throw new InvalidCoordinationProtocolException();
+
+ epr = cs.getCoordinatorService(this, participantRef);
}
+
return epr;
}
- public void forget2PC(String ref) {
- if (volatile2PCParticipants.remove(ref) == null)
- durable2PCParticipants.remove(ref);
+ public void forget2PC(String participantRef) {
+ if (volatile2PCParticipants.remove(participantRef) == null)
+ durable2PCParticipants.remove(participantRef);
}
public void rollback() {
@@ -100,46 +113,53 @@
}
}
- public void aborted(String ref) throws AxisFault {
+ public void aborted(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
- forget2PC(ref);
+ forget2PC(participantRef);
rollback();
return;
case AT2PCStatus.COMMITTING:
- throw INVALID_STATE_SOAP_FAULT;
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ return;
case AT2PCStatus.ABORTING:
- forget2PC(ref);
+ forget2PC(participantRef);
return;
case AT2PCStatus.NONE:
}
}
- public void readOnly(String ref) throws AxisFault {
+ public void readOnly(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
- forget2PC(ref);
+ forget2PC(participantRef);
return;
case AT2PCStatus.COMMITTING:
- throw INVALID_STATE_SOAP_FAULT;
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ return;
case AT2PCStatus.ABORTING:
- forget2PC(ref);
+ forget2PC(participantRef);
return;
case AT2PCStatus.NONE:
}
}
- public void replay(String ref) throws AxisFault {
+ private ParticipantStub getParticipantStub(String participantRef,
+ EndpointReference epr) throws AxisFault, MalformedURLException {
+ return new ParticipantStub(this, participantRef, epr);
+ }
+
+ public void replay(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
@@ -148,111 +168,131 @@
return;
case AT2PCStatus.COMMITTING:
- EndpointReference epr = getEprOf2PCParticipant(ref);
+ EndpointReference epr = getEprToRespond(participantRef);
if (epr != null)
try {
- new ParticipantStub(epr).commitOperation(null);
+ getParticipantStub(participantRef, epr).commitOperation(
+ null);
} catch (Exception e) {
e.printStackTrace();
}
return;
case AT2PCStatus.ABORTING:
- epr = getEprOf2PCParticipant(ref);
+ epr = getEprToRespond(participantRef);
if (epr != null)
try {
- new ParticipantStub(epr).rollbackOperation(null);
+ getParticipantStub(participantRef, epr).rollbackOperation(
+ null);
} catch (Exception e) {
e.printStackTrace();
}
return;
case AT2PCStatus.NONE:
- if (volatile2PCParticipants.containsKey(ref))
- throw INVALID_STATE_SOAP_FAULT;
- epr = (EndpointReference) durable2PCParticipants.get(ref);
- if (epr != null)
- try {
- new ParticipantStub(epr).rollbackOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ if (volatile2PCParticipants.containsKey(participantRef))
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ else {
+ epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+ if (epr == null)
+ epr = getReplyToEpr();
+ if (epr != null)
+ try {
+ getParticipantStub(participantRef, epr).rollbackOperation(
+ null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}
}
- public void prepared(String ref) throws AxisFault {
+ public void prepared(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
- rollback();
- throw INVALID_STATE_SOAP_FAULT;
+ try {
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ } finally {
+ rollback();
+ }
+ return;
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
- preparedParticipants.add(ref);
+ preparedParticipants.add(participantRef);
return;
case AT2PCStatus.COMMITTING:
- EndpointReference epr = getEprOf2PCParticipant(ref);
+ EndpointReference epr = getEprToRespond(participantRef);
if (epr != null)
try {
- new ParticipantStub(epr).commitOperation(null);
+ getParticipantStub(participantRef, epr).commitOperation(
+ null);
} catch (Exception e) {
e.printStackTrace();
}
return;
case AT2PCStatus.ABORTING:
- if (volatile2PCParticipants.remove(ref) != null)
- throw INVALID_STATE_SOAP_FAULT;
- epr = (EndpointReference) durable2PCParticipants.remove(ref);
- if (epr != null) {
- try {
- new ParticipantStub(epr).rollbackOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
+ if (volatile2PCParticipants.remove(participantRef) != null)
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ else {
+ epr = (EndpointReference) durable2PCParticipants.remove(participantRef);
+ if (epr == null)
+ epr = getReplyToEpr();
+ if (epr != null) {
+ try {
+ getParticipantStub(participantRef, epr).rollbackOperation(
+ null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
return;
case AT2PCStatus.NONE:
- if (volatile2PCParticipants.containsKey(ref))
- throw INVALID_STATE_SOAP_FAULT;
- epr = (EndpointReference) durable2PCParticipants.get(ref);
- if (epr != null)
- try {
- new ParticipantStub(epr).rollbackOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ if (volatile2PCParticipants.containsKey(participantRef))
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ else {
+ epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+ if (epr == null)
+ epr = getReplyToEpr();
+ if (epr != null)
+ try {
+ getParticipantStub(participantRef, epr).rollbackOperation(
+ null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}
}
- public void committed(String ref) throws AxisFault {
+ public void committed(String participantRef) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
- rollback();
- throw INVALID_STATE_SOAP_FAULT;
+ try {
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ } finally {
+ rollback();
+ }
+ return;
case AT2PCStatus.COMMITTING:
- forget2PC(ref);
+ forget2PC(participantRef);
return;
case AT2PCStatus.ABORTING:
- throw INVALID_STATE_SOAP_FAULT;
+ throwFault(participantRef, INVALID_STATE_SOAP_FAULT);
+ return;
case AT2PCStatus.NONE:
}
}
- private EndpointReference getEprOf2PCParticipant(String ref) {
- EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(ref);
- if (epr != null)
- return epr;
- return (EndpointReference) durable2PCParticipants.get(ref);
- }
-
private boolean prepare(Map participants) {
int iters = 0;
int status_old = status;
@@ -261,12 +301,15 @@
if (iters++ > 0)
pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
- Iterator iter = participants.values().iterator();
+ Iterator iter = participants.keySet().iterator();
while (iter.hasNext()) {
if (status == AT2PCStatus.ABORTING)
return false;
try {
- new ParticipantStub((EndpointReference) iter.next()).prepareOperation(null);
+ String participantRef = (String) iter.next();
+ getParticipantStub(participantRef,
+ (EndpointReference) participants.get(participantRef)).prepareOperation(
+ null);
} catch (Exception e) {
e.printStackTrace();
}
@@ -334,11 +377,13 @@
Map participants = volatile2PCParticipants;
while (true) {
- Iterator iter = participants.values().iterator();
+ Iterator iter = participants.keySet().iterator();
while (iter.hasNext())
try {
- ParticipantStub p = new ParticipantStub(
- (EndpointReference) iter.next());
+ String participantRef = (String) iter.next();
+ ParticipantStub p = getParticipantStub(
+ participantRef,
+ (EndpointReference) participants.get(participantRef));
if (status == AT2PCStatus.ABORTING)
p.rollbackOperation(null);
else
@@ -399,9 +444,21 @@
replay(getParticipantRef());
}
- private AddressingHeaders getAddressingHeaders() {
- return (AddressingHeaders) MessageContext.getCurrentContext().getProperty(
- Constants.ENV_ADDRESSING_REQUEST_HEADERS);
+ private EndpointReference getEprToSendFault(String participantRef) {
+ EndpointReference epr = getFaultToEpr();
+ if (epr != null)
+ return epr;
+ return getEprToRespond(participantRef);
+ }
+
+ private EndpointReference getEprToRespond(String participantRef) {
+ EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(participantRef);
+ if (epr != null)
+ return epr;
+ epr = (EndpointReference) durable2PCParticipants.get(participantRef);
+ if (epr != null)
+ return epr;
+ return getReplyToEpr();
}
private String getParticipantRef() {
@@ -423,10 +480,31 @@
public synchronized void timeout() throws TimedOutException {
System.out.println("[ATCoordinatorImpl] timeout "
+ AT2PCStatus.getStatusName(status));
+
if (status != AT2PCStatus.NONE) {
maxRetries = 3;
rollback();
throw new TimedOutException();
}
}
+
+ private void throwFault(String participantRef, AxisFault fault)
+ throws AxisFault {
+ throwFault(getEprToSendFault(participantRef), fault);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ws.transaction.coordinator.Callback#onFault(javax.xml.soap.Name)
+ */
+ public synchronized void onFault(Name code) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public EndpointReference getEndpointReference() {
+ throw new UnsupportedOperationException();
+ }
+
}
Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CompletionCoordinatorStub.java Fri Jan 6 21:33:39 2006
@@ -20,6 +20,10 @@
import java.net.URL;
import org.apache.axis.AxisFault;
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.ws.transaction.coordinator.Callback;
+import org.apache.ws.transaction.coordinator.CoordinationService;
+import org.apache.ws.transaction.utility.AddressingHeaders;
import org.apache.ws.transaction.utility.Service;
import org.apache.ws.transaction.utility.TCPSnifferHelper;
import org.apache.ws.transaction.wsat.CompletionCoordinatorBindingStub;
@@ -31,10 +35,17 @@
*/
public class CompletionCoordinatorStub extends CompletionCoordinatorBindingStub {
- public CompletionCoordinatorStub(
- org.apache.axis.message.addressing.EndpointReference epr)
+ public CompletionCoordinatorStub(Callback callback, EndpointReference epr)
throws AxisFault, MalformedURLException {
super(new URL(TCPSnifferHelper.redirect(epr.getAddress().toString())),
- new Service(epr));
+ new Service());
+
+ CoordinationService cs = CoordinationService.getInstance();
+
+ AddressingHeaders headers = new AddressingHeaders(epr,
+ callback.getEndpointReference());
+ headers.setFaultTo(cs.getFaultDispatcherService(callback));
+ ((Service) service).setAddressingHeaders(headers);
+
}
}
Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/CoordinatorImpl.java Fri Jan 6 21:33:39 2006
@@ -6,7 +6,7 @@
import java.rmi.RemoteException;
-import org.apache.ws.transaction.utility.CallbackRegistry;
+import org.apache.ws.transaction.coordinator.CallbackRegistry;
import org.apache.ws.transaction.wsat.CoordinatorPortType;
import org.apache.ws.transaction.wsat.Notification;
@@ -18,31 +18,31 @@
public void preparedOperation(Notification params) throws RemoteException {
ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
- CallbackRegistry.COORDINATOR_REF, false);
+ CallbackRegistry.CALLBACK_REF, false);
c.preparedOperation(params);
}
public void abortedOperation(Notification params) throws RemoteException {
ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
- CallbackRegistry.COORDINATOR_REF, false);
+ CallbackRegistry.CALLBACK_REF, false);
c.abortedOperation(params);
}
public void readOnlyOperation(Notification params) throws RemoteException {
ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
- CallbackRegistry.COORDINATOR_REF, false);
+ CallbackRegistry.CALLBACK_REF, false);
c.readOnlyOperation(params);
}
public void committedOperation(Notification params) throws RemoteException {
ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
- CallbackRegistry.COORDINATOR_REF, false);
+ CallbackRegistry.CALLBACK_REF, false);
c.committedOperation(params);
}
public void replayOperation(Notification params) throws RemoteException {
ATCoordinator c = (ATCoordinator) CallbackRegistry.getInstance().correlateMessage(
- CallbackRegistry.COORDINATOR_REF, false);
+ CallbackRegistry.CALLBACK_REF, false);
c.replayOperation(params);
}
}
Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java?rev=366671&r1=366670&r2=366671&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/TransactionManagerImpl.java Fri Jan 6 21:33:39 2006
@@ -37,7 +37,7 @@
}
public void begin() throws RemoteException {
- begin(CoordinationService.getInstance().getActivationService());
+ begin(CoordinationService.getInstance().getActivationCoordinatorService());
}
public void begin(EndpointReference epr) throws RemoteException {
---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org