You are viewing a plain text version of this content. The canonical link for it is here.
Posted to kandula-dev@ws.apache.org by da...@apache.org on 2006/01/02 20:08:01 UTC
svn commit: r365395 -
/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
Author: dasarath
Date: Mon Jan 2 11:07:59 2006
New Revision: 365395
URL: http://svn.apache.org/viewcvs?rev=365395&view=rev
Log: (empty)
Modified:
webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
Modified: webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java
URL: http://svn.apache.org/viewcvs/webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java?rev=365395&r1=365394&r2=365395&view=diff
==============================================================================
--- webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java (original)
+++ webservices/kandula/branches/Kandula_1/src/java/org/apache/ws/transaction/coordinator/at/ATCoordinatorImpl.java Mon Jan 2 11:07:59 2006
@@ -6,7 +6,6 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -14,8 +13,6 @@
import java.util.Map;
import java.util.Set;
-import javax.xml.namespace.QName;
-
import org.apache.axis.AxisFault;
import org.apache.axis.MessageContext;
import org.apache.axis.components.uuid.UUIDGen;
@@ -27,7 +24,6 @@
import org.apache.axis.types.URI.MalformedURIException;
import org.apache.ws.transaction.coordinator.CoordinationService;
import org.apache.ws.transaction.coordinator.CoordinatorImpl;
-import org.apache.ws.transaction.coordinator.InvalidCoordinationProtocolException;
import org.apache.ws.transaction.coordinator.TimedOutException;
import org.apache.ws.transaction.wsat.Notification;
@@ -37,19 +33,17 @@
*/
public class ATCoordinatorImpl extends CoordinatorImpl implements ATCoordinator {
- int status = AT2PCStatus.NONE;
-
- private static final int VOLATILE = 0;
+ int status = AT2PCStatus.ACTIVE;
- private static final int DURABLE = 1;
+ Set preparedParticipants = new HashSet();
- Map participants2PC[] = new Map[2];
+ List completionParticipants = new ArrayList();
- Set prepared = Collections.synchronizedSet(new HashSet());
+ Map volatile2PCParticipants = new HashMap();
- List participantsComp = Collections.synchronizedList(new ArrayList());
+ Map durable2PCParticipants = new HashMap();
- public static final int MAX_RETRIES = 10;
+ public static int maxRetries = 10;
public static final int RETRY_DELAY_MILLIS = 20 * 1000;
@@ -57,13 +51,10 @@
public ATCoordinatorImpl() throws MalformedURIException {
super(COORDINATION_TYPE_ID);
- status = AT2PCStatus.ACTIVE;
- for (int i = 0; i < 2; i++)
- participants2PC[i] = Collections.synchronizedMap(new HashMap());
}
public EndpointReference register(String prot, EndpointReference pps)
- throws InvalidCoordinationProtocolException {
+ throws AxisFault {
if (!(status == AT2PCStatus.ACTIVE || status == AT2PCStatus.PREPARING_VOLATILE))
throw new IllegalStateException();
CoordinationService cs = CoordinationService.getInstance();
@@ -71,7 +62,7 @@
EndpointReference epr = null;
if (prot.equals(PROTOCOL_ID_COMPLETION)) {
if (pps != null)
- participantsComp.add(pps);
+ completionParticipants.add(pps);
epr = cs.getCompletionCoordinatorService(this);
} else {
if (pps == null)
@@ -79,19 +70,19 @@
UUIDGen gen = UUIDGenFactory.getUUIDGen();
ref = "uuid:" + gen.nextUUID();
if (prot.equals(PROTOCOL_ID_VOLATILE_2PC))
- participants2PC[VOLATILE].put(ref, pps);
+ volatile2PCParticipants.put(ref, pps);
else if (prot.equals(PROTOCOL_ID_DURABLE_2PC))
- participants2PC[DURABLE].put(ref, pps);
+ durable2PCParticipants.put(ref, pps);
else
- throw new InvalidCoordinationProtocolException();
+ throw INVALID_PROTOCOL_SOAP_FAULT;
epr = cs.getCoordinatorService(this, ref);
}
return epr;
}
public void forget2PC(String ref) {
- if (participants2PC[VOLATILE].remove(ref) == null)
- participants2PC[DURABLE].remove(ref);
+ if (volatile2PCParticipants.remove(ref) == null)
+ durable2PCParticipants.remove(ref);
}
public void rollback() {
@@ -109,7 +100,7 @@
}
}
- public void aborted(String ref) {
+ public void aborted(String ref) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
@@ -119,7 +110,7 @@
return;
case AT2PCStatus.COMMITTING:
- throw new IllegalStateException();
+ throw INVALID_STATE_SOAP_FAULT;
case AT2PCStatus.ABORTING:
forget2PC(ref);
@@ -129,7 +120,7 @@
}
}
- public void readOnly(String ref) {
+ public void readOnly(String ref) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
@@ -138,7 +129,7 @@
return;
case AT2PCStatus.COMMITTING:
- throw new IllegalStateException();
+ throw INVALID_STATE_SOAP_FAULT;
case AT2PCStatus.ABORTING:
forget2PC(ref);
@@ -148,7 +139,7 @@
}
}
- public void replay(String ref) {
+ public void replay(String ref) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
@@ -157,110 +148,120 @@
return;
case AT2PCStatus.COMMITTING:
- EndpointReference epr = getEpr(ref);
- try {
- new ParticipantStub(epr).commitOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ EndpointReference epr = getEprOf2PCParticipant(ref);
+ if (epr != null)
+ try {
+ new ParticipantStub(epr).commitOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return;
case AT2PCStatus.ABORTING:
- epr = getEpr(ref);
- try {
- new ParticipantStub(epr).rollbackOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ epr = getEprOf2PCParticipant(ref);
+ if (epr != null)
+ try {
+ new ParticipantStub(epr).rollbackOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return;
case AT2PCStatus.NONE:
- epr = (EndpointReference) participants2PC[VOLATILE].get(ref);
+ if (volatile2PCParticipants.containsKey(ref))
+ throw INVALID_STATE_SOAP_FAULT;
+ epr = (EndpointReference) durable2PCParticipants.get(ref);
if (epr != null)
- throw new IllegalStateException();
- epr = (EndpointReference) participants2PC[DURABLE].get(ref);
- try {
- new ParticipantStub(epr).rollbackOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ try {
+ new ParticipantStub(epr).rollbackOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
- public void prepared(String ref) {
+ public void prepared(String ref) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
rollback();
- throw new IllegalStateException();
+ throw INVALID_STATE_SOAP_FAULT;
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
- prepared.add(ref);
+ preparedParticipants.add(ref);
return;
case AT2PCStatus.COMMITTING:
- EndpointReference epr = getEpr(ref);
- try {
- new ParticipantStub(epr).commitOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ EndpointReference epr = getEprOf2PCParticipant(ref);
+ if (epr != null)
+ try {
+ new ParticipantStub(epr).commitOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
return;
case AT2PCStatus.ABORTING:
- case AT2PCStatus.NONE:
- epr = (EndpointReference) participants2PC[VOLATILE].get(ref);
+ if (volatile2PCParticipants.remove(ref) != null)
+ throw INVALID_STATE_SOAP_FAULT;
+ epr = (EndpointReference) durable2PCParticipants.remove(ref);
if (epr != null) {
- if (status == AT2PCStatus.ABORTING)
- forget2PC(ref);
- throw new IllegalStateException();
- }
- epr = (EndpointReference) participants2PC[DURABLE].get(ref);
- if (status == AT2PCStatus.ABORTING)
- forget2PC(ref);
- try {
- new ParticipantStub(epr).rollbackOperation(null);
- } catch (Exception e) {
- e.printStackTrace();
+ try {
+ new ParticipantStub(epr).rollbackOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
+ return;
+
+ case AT2PCStatus.NONE:
+ if (volatile2PCParticipants.containsKey(ref))
+ throw INVALID_STATE_SOAP_FAULT;
+ epr = (EndpointReference) durable2PCParticipants.get(ref);
+ if (epr != null)
+ try {
+ new ParticipantStub(epr).rollbackOperation(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
- public void committed(String ref) {
+ public void committed(String ref) throws AxisFault {
switch (status) {
case AT2PCStatus.ACTIVE:
case AT2PCStatus.PREPARING_VOLATILE:
case AT2PCStatus.PREPARING_DURABLE:
rollback();
- throw new IllegalStateException();
+ throw INVALID_STATE_SOAP_FAULT;
case AT2PCStatus.COMMITTING:
forget2PC(ref);
return;
case AT2PCStatus.ABORTING:
- throw new IllegalStateException();
+ throw INVALID_STATE_SOAP_FAULT;
case AT2PCStatus.NONE:
}
}
- private EndpointReference getEpr(String ref) {
- EndpointReference epr = (EndpointReference) participants2PC[VOLATILE].get(ref);
- if (epr == null)
- epr = (EndpointReference) participants2PC[DURABLE].get(ref);
- return epr;
+ private EndpointReference getEprOf2PCParticipant(String ref) {
+ EndpointReference epr = (EndpointReference) volatile2PCParticipants.get(ref);
+ if (epr != null)
+ return epr;
+ return (EndpointReference) durable2PCParticipants.get(ref);
}
- private boolean prepare(int prot) {
+ private boolean prepare(Map participants) {
int iters = 0;
int status_old = status;
- while (iters < MAX_RETRIES) {
+ while (iters < maxRetries) {
if (iters++ > 0)
pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
- Iterator iter = participants2PC[prot].values().iterator();
+ Iterator iter = participants.values().iterator();
while (iter.hasNext()) {
if (status == AT2PCStatus.ABORTING)
return false;
@@ -273,7 +274,7 @@
pause(RESPONSE_DELAY_MILLIS);
- if (prepared.containsAll(participants2PC[prot].keySet()))
+ if (preparedParticipants.containsAll(participants.keySet()))
return status == status_old;
}
@@ -282,11 +283,11 @@
private boolean prepare() {
status = AT2PCStatus.PREPARING_VOLATILE;
- if (!prepare(VOLATILE))
+ if (!prepare(volatile2PCParticipants))
return false;
status = AT2PCStatus.PREPARING_DURABLE;
- return prepare(DURABLE);
+ return prepare(durable2PCParticipants);
}
public void commit() {
@@ -319,16 +320,21 @@
}
}
+ private boolean noParticipantsToTerminate() {
+ return volatile2PCParticipants.isEmpty()
+ && durable2PCParticipants.isEmpty();
+ }
+
private void terminate() {
int iters = 0;
- while (iters < MAX_RETRIES
- && !(participants2PC[VOLATILE].isEmpty() && participants2PC[DURABLE].isEmpty())) {
+ while (iters < maxRetries && !noParticipantsToTerminate()) {
+
if (iters++ > 0)
pause(RETRY_DELAY_MILLIS - RESPONSE_DELAY_MILLIS);
- for (int prot = VOLATILE; prot == VOLATILE || prot == DURABLE; prot = prot == VOLATILE ? DURABLE
- : -1) {
- Iterator iter = participants2PC[prot].values().iterator();
+ Map participants = volatile2PCParticipants;
+ while (true) {
+ Iterator iter = participants.values().iterator();
while (iter.hasNext())
try {
ParticipantStub p = new ParticipantStub(
@@ -340,14 +346,18 @@
} catch (Exception e) {
e.printStackTrace();
}
+ if (participants == volatile2PCParticipants)
+ participants = durable2PCParticipants;
+ else
+ break;
}
pause(RESPONSE_DELAY_MILLIS);
}
- if (participants2PC[VOLATILE].isEmpty()
- && participants2PC[DURABLE].isEmpty()) {
- Iterator iter = participantsComp.iterator();
+ if (noParticipantsToTerminate()) {
+
+ Iterator iter = completionParticipants.iterator();
while (iter.hasNext())
try {
CompletionInitiatorStub p = new CompletionInitiatorStub(
@@ -364,20 +374,9 @@
status = AT2PCStatus.NONE;
}
- private AxisFault getInvalidStateSoapFault() {
- QName subcode = new QName(
- "http://schemas.xmlsoap.org/ws/2004/10/wscoor", "InvalidState");
- String faultString = "The message was invalid for the current state of the activity.";
- return new AxisFault(subcode, faultString, null, null);
- }
-
public synchronized void preparedOperation(Notification parameters)
throws RemoteException {
- try {
- prepared(getParticipantRef());
- } catch (IllegalStateException e) {
- throw getInvalidStateSoapFault();
- }
+ prepared(getParticipantRef());
}
public synchronized void abortedOperation(Notification parameters)
@@ -400,10 +399,14 @@
replay(getParticipantRef());
}
- private synchronized String getParticipantRef() {
- AddressingHeaders header = (AddressingHeaders) MessageContext.getCurrentContext().getProperty(
+ private AddressingHeaders getAddressingHeaders() {
+ return (AddressingHeaders) MessageContext.getCurrentContext().getProperty(
Constants.ENV_ADDRESSING_REQUEST_HEADERS);
- MessageElement e = header.getReferenceProperties().get(PARTICIPANT_REF);
+ }
+
+ private String getParticipantRef() {
+ AddressingHeaders headers = getAddressingHeaders();
+ MessageElement e = headers.getReferenceProperties().get(PARTICIPANT_REF);
return e.getValue();
}
@@ -417,12 +420,13 @@
rollback();
}
- public synchronized void timeout() {
+ public synchronized void timeout() throws TimedOutException {
System.out.println("[ATCoordinatorImpl] timeout "
+ AT2PCStatus.getStatusName(status));
- if (status == AT2PCStatus.NONE)
- return;
- rollback();
- throw new TimedOutException();
+ if (status != AT2PCStatus.NONE) {
+ maxRetries = 3;
+ rollback();
+ throw new TimedOutException();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: kandula-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: kandula-dev-help@ws.apache.org