You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/03/07 03:39:04 UTC
svn commit: r383751 [1/2] - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/ handlers/
msgprocessors/ util/ workers/ wsrm/
Author: chamikara
Date: Mon Mar 6 18:39:01 2006
New Revision: 383751
URL: http://svn.apache.org/viewcvs?rev=383751&view=rev
Log:
Add the AckFinal, AckNone, CloseSequence, CloseSequenceResponse elements as required by the new spec. Scenario 1.3 is working.
Corrected the fault handlong logic.
Bug fixes.
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/AckFinal.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/AckNone.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/CloseSequence.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/CloseSequenceResponse.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MsgInitializer.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/IOMRMPart.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/RMElements.java
webservices/sandesha/trunk/src/org/apache/sandesha2/wsrm/SequenceAcknowledgement.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Mon Mar 6 18:39:01 2006
@@ -21,19 +21,35 @@
import java.util.Collection;
import java.util.Iterator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.PropertyManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaPropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
/**
* Contains logic for managing acknowledgements.
@@ -175,11 +191,209 @@
return completedMsgList;
}
- public static void sendSyncAck () {
+ public static RMMsgContext generateAckMessage (RMMsgContext referenceRMMessage, String sequenceID)throws SandeshaException {
+
+ MessageContext referenceMsg = referenceRMMessage.getMessageContext();
+
+ ConfigurationContext configurationContext = referenceRMMessage.getMessageContext().getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ //Setting the ack depending on AcksTo.
+ SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+ EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
+ String acksToStr = acksTo.getAddress();
+
+ if (acksToStr == null)
+ throw new SandeshaException(
+ "acksToStr Seqeunce property is not set correctly");
+
+ AxisOperation ackOperation = null;
+
+ try {
+ ackOperation = AxisOperationFactory.getOperationDescription(AxisOperationFactory.MEP_URI_IN_ONLY);
+ } catch (AxisFault e) {
+ throw new SandeshaException("Could not create the Operation");
+ }
+
+ AxisOperation rmMsgOperation = referenceRMMessage.getMessageContext()
+ .getAxisOperation();
+ if (rmMsgOperation != null) {
+ ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
+ if (outFlow != null) {
+ ackOperation.setPhasesOutFlow(outFlow);
+ ackOperation.setPhasesOutFaultFlow(outFlow);
+ }
+ }
+
+ MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
+ referenceRMMessage, ackOperation);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+ ackRMMsgCtx.setRMNamespaceValue(referenceRMMessage.getRMNamespaceValue());
+
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(referenceMsg.getEnvelope()));
+
+ //Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+ try {
+ ackMsgCtx.setEnvelope(envelope);
+ } catch (AxisFault e3) {
+ throw new SandeshaException(e3.getMessage());
+ }
+
+ ackMsgCtx.setTo(acksTo);
+ ackMsgCtx.setReplyTo(referenceMsg.getTo());
+
+ //adding the SequenceAcknowledgement part.
+ RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceID);
+
+ if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
+
+// AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
+// .getConfigurationContext());
+
+ //setting CONTEXT_WRITTEN since acksto is anonymous
+ if (referenceRMMessage.getMessageContext().getOperationContext() == null) {
+ //operation context will be null when doing in a GLOBAL
+ // handler.
+ try {
+ AxisOperation op = AxisOperationFactory
+ .getAxisOperation(AxisOperationFactory.MEP_CONSTANT_IN_OUT);
+ OperationContext opCtx = new OperationContext(op);
+ referenceRMMessage.getMessageContext().setAxisOperation(op);
+ referenceRMMessage.getMessageContext().setOperationContext(opCtx);
+ } catch (AxisFault e2) {
+ throw new SandeshaException(e2.getMessage());
+ }
+ }
+
+ referenceRMMessage.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_TRUE);
+
+ referenceRMMessage.getMessageContext().setProperty(
+ Sandesha2Constants.ACK_WRITTEN, "true");
+// try {
+// engine.send(ackRMMsgCtx.getMessageContext());
+// } catch (AxisFault e1) {
+// throw new SandeshaException(e1.getMessage());
+// }
+ return ackRMMsgCtx;
+
+ } else {
+
+ Transaction asyncAckTransaction = storageManager.getTransaction();
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager
+ .getRetransmitterBeanMgr();
+
+ String key = SandeshaUtil.getUUID();
+
+ //dumping to the storage will be done be Sandesha2 Transport Sender
+ //storageManager.storeMessageContext(key,ackMsgCtx);
+
+ SenderBean ackBean = new SenderBean();
+ ackBean.setMessageContextRefKey(key);
+ ackBean.setMessageID(ackMsgCtx.getMessageID());
+ ackBean.setReSend(false);
+
+ //this will be set to true in the sender.
+ ackBean.setSend(true);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ //the internalSequenceId value of the retransmitter Table for the
+ // messages related to an incoming
+ //sequence is the actual sequence ID
+
+// RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+// .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+
+// long ackInterval = PropertyManager.getInstance()
+// .getAcknowledgementInterval();
+
+ Parameter param = referenceMsg.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+
+ SandeshaPropertyBean propertyBean = null;
+ if (param!=null) {
+ propertyBean = (SandeshaPropertyBean) param.getValue();
+ }else {
+ propertyBean = PropertyManager.getInstance().getPropertyBean();
+ }
+
+
+ long ackInterval = propertyBean.getAcknowledgementInaterval();
+
+ // if (policyBean != null) {
+// ackInterval = policyBean.getAcknowledgementInaterval();
+// }
+
+ //Ack will be sent as stand alone, only after the retransmitter
+ // interval.
+ long timeToSend = System.currentTimeMillis() + ackInterval;
+
+ //removing old acks.
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ //this will be set to true in the sandesha2TransportSender.
+ findBean.setSend(true);
+ findBean.setReSend(false);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+
+ if (it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ timeToSend = oldAckBean.getTimeToSend(); //If there is an old ack. This ack will be sent in the old timeToSend.
+ retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+ }
+
+ ackBean.setTimeToSend(timeToSend);
+
+ storageManager.storeMessageContext(key,ackMsgCtx);
+
+ //inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
+ asyncAckTransaction.commit();
+
+ //passing the message through sandesha2sender
+
+ ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
+ ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+
+ ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
+
+// AxisEngine engine = new AxisEngine (configurationContext);
+// try {
+// engine.send(ackMsgCtx);
+// } catch (AxisFault e) {
+// throw new SandeshaException (e.getMessage());
+// }
+
+ RMMsgContext ackRMMessageCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+
+ SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
+
+ referenceMsg.pause();
+
+ return ackRMMessageCtx;
+ }
+
- }
-
- public static void sendAsyncAck () {
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Mon Mar 6 18:39:01 2006
@@ -17,6 +17,8 @@
package org.apache.sandesha2;
+import org.apache.axis2.addressing.AddressingConstants;
+
/**
* Contains all the Sandesha2Constants of Sandesha2.
@@ -31,8 +33,8 @@
public interface SPEC_VERSIONS {
- String WSRM = "Spec_2005_10";
- String WSRX = "Spec_2005_02";
+ String WSRM = "Spec_2005_02";
+ String WSRX = "Spec_2005_10";
}
public interface SPEC_2005_02 {
@@ -73,8 +75,14 @@
String ACTION_TERMINATE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequence";
+ String ACTION_TERMINATE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequenceResponse";
+
+ String ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200602/AckRequested";
+
String ACTION_CLOSE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequence";
+ String ACTION_CLOSE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequenceResponse";
+
String SOAP_ACTION_CREATE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CreateSequence";
String SOAP_ACTION_CREATE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CreateSequenceResponse";
@@ -83,6 +91,8 @@
String SOAP_ACTION_TERMINATE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequence";
+ String SOAP_ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200510/AckRequested";
+
String SOAP_ACTION_CLOSE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequence";
}
}
@@ -103,6 +113,10 @@
String TERMINATE_SEQUENCE = "TerminateSequence";
+ String CLOSE_SEQUENCE = "CloseSequence";
+
+ String CLOSE_SEQUENCE_RESPONSE = "CloseSequenceResponse";
+
String TERMINATE_SEQUENCE_RESPONSE = "TerminateSequenceResponse";
String FAULT_CODE = "FaultCode";
@@ -132,12 +146,16 @@
String IDENTIFIER = "Identifier";
String ACCEPT = "Accept";
+
+ String NONE = "None";
+
+ String FINAL = "Final";
}
public interface WSA {
- String NS_URI_ANONYMOUS = "http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous";
+ String NS_URI_ANONYMOUS = AddressingConstants.Final.WSA_ANONYMOUS_URL; // "http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous";
- String NS_URI_ADDRESSING = "http://schemas.xmlsoap.org/ws/2004/08/addressing";
+ String NS_URI_ADDRESSING = AddressingConstants.Final.WSA_NAMESPACE; //"http://schemas.xmlsoap.org/ws/2004/08/addressing";
String NS_PREFIX_ADDRESSING = "wsa";
@@ -163,11 +181,15 @@
int CLOSE_SEQUENCE = 5;
- int TERMINATE_SEQ = 6;
+ int CLOSE_SEQUENCE_RESPONSE = 6;
- int TERMINATE_SEQ_RESPONSE = 7;
+ int TERMINATE_SEQ = 7;
+
+ int ACK_REQUEST = 8;
+
+ int TERMINATE_SEQ_RESPONSE = 9;
- int MAX_MESSAGE_TYPE = 7;
+ int MAX_MESSAGE_TYPE = 9;
}
public interface MessageParts {
@@ -184,12 +206,16 @@
int CREATE_SEQ_RESPONSE = 10;
int TERMINATE_SEQ = 11;
+
+ int CLOSE_SEQUENCE = 12;
+
+ int CLOSE_SEQUENCE_RESPONSE = 13;
- int TERMINATE_SEQ_RESPONSE = 12;
+ int TERMINATE_SEQ_RESPONSE = 14;
- int ACK_REQUEST = 13;
+ int ACK_REQUEST = 15;
- int MAX_MSG_PART_ID = 13;
+ int MAX_MSG_PART_ID = 15;
}
public interface SequenceProperties {
@@ -243,6 +269,15 @@
String TRANSPORT_TO = "TransportTo";
String OUT_SEQ_ACKSTO = "OutSequenceAcksTo";
+
+ String SEQUENCE_CLOSED = "SequenceClosed";
+
+ String LAST_MESSAGE = "LastMessage";
+
+
+ String REQUEST_SIDE_SEQUENCE_ID = "RequestSideSequenceID"; //used only at the server side
+
+ String HIGHEST_MSG_NO = "HighestMessageNumber";
}
public interface SOAPVersion {
@@ -291,6 +326,8 @@
public interface Subcodes {
String SEQUENCE_TERMINATED = "wsrm:SequenceTerminated";
+
+ String SEQUENCE_CLOSED = "wsrm:SequenceClosed";
String UNKNOWN_SEQUENCE = "wsrm:UnknownSequence";
@@ -301,6 +338,7 @@
String LAST_MESSAGE_NO_EXCEEDED = "wsrm:LastMessageNumberExceeded";
String CREATE_SEQUENCE_REFUSED = "wsrm:CreateSequenceRefused";
+
}
@@ -313,7 +351,8 @@
public static final int INVALID_ACKNOWLEDGEMENT = 3;
public static final int CREATE_SEQUENCE_REFUSED = 4;
-
+
+ public static final int LAST_MESSAGE_NO_EXCEEDED = 5;
}
}
@@ -335,6 +374,8 @@
String MessageTypesToDrop = "MessageTypesToDrop";
+ String RetransmissionCount = "RetransmissionCount";
+
public interface DefaultValues {
int RetransmissionInterval = 20000;
@@ -352,6 +393,8 @@
boolean InvokeInOrder = true;
String MessageTypesToDrop=VALUE_NONE;
+
+ int RetransmissionCount = 8;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Mon Mar 6 18:39:01 2006
@@ -22,7 +22,6 @@
import org.apache.axis2.description.AxisDescription;
import org.apache.axis2.description.AxisModule;
import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterImpl;
import org.apache.axis2.description.PolicyInclude;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.modules.Module;
@@ -66,7 +65,7 @@
SandeshaPropertyBean defaultPropertyBean = PropertyManager.getInstance().getPropertyBean();
SandeshaPropertyBean axisDescPropertyBean = RMPolicyManager.loadPoliciesFromAxisDescription(axisDescription);
- Parameter parameter = new ParameterImpl ();
+ Parameter parameter = new Parameter ();
parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
if (axisDescPropertyBean==null) {
parameter.setValue(defaultPropertyBean);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SpecSpecificConstants.java Mon Mar 6 18:39:01 2006
@@ -2,7 +2,16 @@
public class SpecSpecificConstants {
- private static String unknowsSpecErrorMessage = "Unknown specification version";
+ private static String unknownSpecErrorMessage = "Unknown specification version";
+
+ public static String getSpecVersionString (String namespaceValue) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_2005_02.NS_URI.equals(namespaceValue))
+ return Sandesha2Constants.SPEC_VERSIONS.WSRM;
+ else if (Sandesha2Constants.SPEC_2005_10.NS_URI.equals(namespaceValue))
+ return Sandesha2Constants.SPEC_VERSIONS.WSRX;
+ else
+ throw new SandeshaException ("Unknows rm namespace value");
+ }
public static String getRMNamespaceValue (String specVersion) throws SandeshaException {
if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
@@ -10,7 +19,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.NS_URI;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getCreateSequenceAction (String specVersion) throws SandeshaException {
@@ -19,7 +28,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getCreateSequenceResponseAction (String specVersion) throws SandeshaException {
@@ -28,7 +37,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CREATE_SEQUENCE_RESPONSE;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getTerminateSequenceAction (String specVersion) throws SandeshaException {
@@ -37,7 +46,25 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_TERMINATE_SEQUENCE;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getCloseSequenceAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ throw new SandeshaException ("This rm spec version does not define a sequenceClose action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_CLOSE_SEQUENCE;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getAckRequestAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ throw new SandeshaException ("this spec version does not define a ackRequest action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_ACK_REQUEST;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getSequenceAcknowledgementAction (String specVersion) throws SandeshaException {
@@ -46,7 +73,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getCreateSequenceSOAPAction (String specVersion) throws SandeshaException {
@@ -55,7 +82,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getCreateSequenceResponseSOAPAction (String specVersion) throws SandeshaException {
@@ -64,7 +91,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getTerminateSequenceSOAPAction (String specVersion) throws SandeshaException {
@@ -73,7 +100,16 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_TERMINATE_SEQUENCE;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getAckRequestSOAPAction (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ throw new SandeshaException ("this spec version does not define a ackRequest SOAP action");
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_ACK_REQUEST;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static String getSequenceAcknowledgementSOAPAction (String specVersion) throws SandeshaException {
@@ -82,7 +118,7 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return Sandesha2Constants.SPEC_2005_10.Actions.SOAP_ACTION_SEQUENCE_ACKNOWLEDGEMENT;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
}
public static boolean isTerminateSequenceResponseRequired (String specVersion) throws SandeshaException {
@@ -91,6 +127,46 @@
else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
return true;
else
- throw new SandeshaException (unknowsSpecErrorMessage);
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isLastMessageIndicatorRequired (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ return true;
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return false;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isAckFinalAllowed (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isAckNoneAllowed (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static boolean isSequenceClosingAllowed (String specVersion) throws SandeshaException {
+ if (Sandesha2Constants.SPEC_VERSIONS.WSRM.equals(specVersion))
+ return false;
+ else if (Sandesha2Constants.SPEC_VERSIONS.WSRX.equals(specVersion))
+ return true;
+ else
+ throw new SandeshaException (unknownSpecErrorMessage);
+ }
+
+ public static String getDefaultSpecVersion () {
+ return Sandesha2Constants.SPEC_VERSIONS.WSRM;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Mon Mar 6 18:39:01 2006
@@ -24,9 +24,16 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.client.Sandesha2ClientAPI;
+import org.apache.sandesha2.client.SequenceReport;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
@@ -39,7 +46,9 @@
import org.apache.sandesha2.storage.beans.NextMsgBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
import org.apache.sandesha2.util.PropertyManager;
+import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaPropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
@@ -287,4 +296,7 @@
return deleatable;
}
+
+
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java Mon Mar 6 18:39:01 2006
@@ -19,19 +19,36 @@
import java.util.ArrayList;
import java.util.Iterator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.SpecSpecificConstants;
+import org.apache.sandesha2.TerminateManager;
+import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
+import com.sun.corba.se.internal.core.ServiceContext;
+
/**
* Contains all the Sandesha2Constants of Sandesha2.
* Please see sub-interfaces to see grouped data.
@@ -50,11 +67,12 @@
public static String SEQUENCE_KEY = "Sandesha2ClientAPIPropertySequenceKey";
public static String MESSAGE_NUMBER = "Sandesha2ClientAPIPropertyMessageNumber";
public static String RM_SPEC_VERSION = "Sandesha2ClientAPIPropertyRMSpecVersion";
+ public static String DUMMY_MESSAGE = "Sandesha2ClientAPIDummyMessage"; //If this property is set, even though this message will invoke the RM handlers, this will not be sent as an actual application message
+ public static String VALUE_TRUE = "true";
+ public static String VALUE_FALSE = "false";
- public static SequenceReport getOutgoingSequenceReport (String to, String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
-
- String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
- SequenceReport sequenceReport = new SequenceReport ();
+ public static SequenceReport getOutgoingSequenceReport (String internalSequenceID,ConfigurationContext configurationContext) throws SandeshaException {
+ SequenceReport sequenceReport = new SequenceReport ();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
SequencePropertyBeanMgr seqpPropMgr = storageManager.getSequencePropretyBeanMgr();
@@ -91,7 +109,14 @@
reportTransaction.commit();
- return sequenceReport;
+ return sequenceReport;
+ }
+
+ public static SequenceReport getOutgoingSequenceReport (String to, String sequenceKey,ConfigurationContext configurationContext) throws SandeshaException {
+
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID (to,sequenceKey);
+ return getOutgoingSequenceReport(internalSequenceID,configurationContext);
+
}
public static SequenceReport getIncomingSequenceReport (String sequenceID,ConfigurationContext configurationContext) throws SandeshaException {
@@ -121,11 +146,5 @@
return rmReport;
}
- public static String generateInternalSequenceIDForTheClientSide (String toEPR,String sequenceKey) {
- return SandeshaUtil.getInternalSequenceID(toEPR,sequenceKey);
- }
-
- public static void endSequence (String internalSequenceID, ConfigurationContext configurationContext) {
-
- }
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon Mar 6 18:39:01 2006
@@ -23,8 +23,10 @@
import javax.xml.namespace.QName;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.ws.commons.soap.SOAPBody;
@@ -61,18 +63,6 @@
if (!isRMGlobalMessage) {
return;
}
-
-
- FaultManager faultManager = new FaultManager();
- RMMsgContext faultMessageContext = faultManager
- .checkForPossibleFaults(msgContext);
- if (faultMessageContext != null) {
- ConfigurationContext configurationContext = msgContext
- .getConfigurationContext();
- AxisEngine engine = new AxisEngine(configurationContext);
- engine.send(faultMessageContext.getMessageContext());
- return;
- }
RMMsgContext rmMessageContext = MsgInitializer
.initializeMessage(msgContext);
@@ -179,6 +169,23 @@
}
}
+ }
+ }
+ } else if (rmMsgContext.getMessageType()!=Sandesha2Constants.MessageTypes.UNKNOWN) {
+ //droping other known message types if, an suitable operation context is not available,
+ //and if a relates to value is present.
+ RelatesTo relatesTo = rmMsgContext.getRelatesTo();
+ if (relatesTo!=null) {
+ String value = relatesTo.getValue();
+
+ //TODO do not drop, relationshipTypes other than reply
+
+ ConfigurationContext configurationContext = rmMsgContext.getMessageContext().getConfigurationContext();
+ OperationContext opCtx = configurationContext.getOperationContext(value);
+ if (opCtx==null) {
+ String message = "Dropping duplicate RM message";
+ log.debug(message);
+ drop=true;
}
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Mon Mar 6 18:39:01 2006
@@ -34,6 +34,7 @@
import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
* This is invoked in the inFlow of an RM endpoint. This is responsible for selecting an suitable
@@ -64,15 +65,6 @@
if (null != DONE && "true".equals(DONE))
return;
- FaultManager faultManager = new FaultManager();
- RMMsgContext faultMessageContext = faultManager
- .checkForPossibleFaults(msgCtx);
- if (faultMessageContext != null) {
- AxisEngine engine = new AxisEngine(context);
- engine.send(faultMessageContext.getMessageContext());
- return;
- }
-
AxisService axisService = msgCtx.getAxisService();
if (axisService == null) {
String message = "AxisService is null";
@@ -89,6 +81,9 @@
throw new AxisFault(message);
}
+ if (rmMsgCtx.getMessageContext().getAxisOperation().getParent()==null) {
+ System.out.println("Operation parent was null for message:" + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
+ }
MsgProcessor msgProcessor = MsgProcessorFactory
.getMessageProcessor(rmMsgCtx.getMessageType());
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Mar 6 18:39:01 2006
@@ -30,7 +30,6 @@
import org.apache.axis2.description.AxisOperationFactory;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
-import org.apache.axis2.description.ParameterImpl;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.handlers.AbstractHandler;
@@ -106,7 +105,12 @@
msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
"true");
-
+
+ String dummyMessageString = (String) msgCtx.getOptions().getProperty(Sandesha2ClientAPI.DUMMY_MESSAGE);
+ boolean dummyMessage = false;
+ if (dummyMessageString!=null && Sandesha2ClientAPI.VALUE_TRUE.equals(dummyMessageString))
+ dummyMessage = true;
+
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(context);
@@ -123,7 +127,7 @@
if (policyParam == null) {
SandeshaPropertyBean propertyBean = PropertyManager.getInstance()
.getPropertyBean();
- Parameter parameter = new ParameterImpl();
+ Parameter parameter = new Parameter();
parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
parameter.setValue(propertyBean);
@@ -233,7 +237,8 @@
}
//saving the used message number
- setNextMsgNo(context,internalSequenceId,messageNumber);
+ if (!dummyMessage)
+ setNextMsgNo(context,internalSequenceId,messageNumber);
boolean sendCreateSequence = false;
@@ -419,7 +424,8 @@
}
// processing the response
- processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
+ if (!dummyMessage)
+ processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
msgCtx.pause(); // the execution will be stopped.
outHandlerTransaction.commit();
@@ -659,6 +665,11 @@
throw new SandeshaException(message);
}
+
+ //TODO check for highest msg no.
+ long requestMsgNo = requestSequence.getMessageNumber().getMessageNumber();
+
+
if (requestSequence.getLastMessage() != null) {
lastMessage = true;
sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
@@ -679,7 +690,15 @@
Object obj = msg.getProperty(Sandesha2ClientAPI.LAST_MESSAGE);
if (obj != null && "true".equals(obj)) {
lastMessage = true;
- sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
+
+ SequencePropertyBean specVersionBean = sequencePropertyMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
+ if (specVersionBean==null)
+ throw new SandeshaException ("Spec version bean is not set");
+ String specVersion = specVersionBean.getValue();
+
+ if (SpecSpecificConstants.isLastMessageIndicatorRequired(specVersion))
+ sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
+
// saving the last message no.
SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
internalSequenceId,
@@ -693,8 +712,8 @@
AckRequested ackRequested = null;
boolean addAckRequested = false;
- if (!lastMessage)
- addAckRequested = true;
+ //if (!lastMessage)
+ addAckRequested = true; //TODO decide the policy to add the ackRequested tag
// setting the Sequnece id.
// Set send = true/false depending on the availability of the out
@@ -730,6 +749,7 @@
//Retransmitter bean entry for the application message
SenderBean appMsgEntry = new SenderBean();
String storageKey = SandeshaUtil.getUUID();
+
appMsgEntry.setMessageContextRefKey(storageKey);
appMsgEntry.setTimeToSend(System.currentTimeMillis());
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=383751&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Mar 6 18:39:01 2006
@@ -0,0 +1,268 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.PropertyManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaPropertyBean;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.AckRequested;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
+
+/**
+ * Responsible for processing an incoming Application message.
+ *
+ * @author Chamikara Jayalath <ch...@gmail.com>
+ */
+public class AckRequestedProcessor implements MsgProcessor {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+
+ AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
+ if (ackRequested==null) {
+ throw new SandeshaException ("Message identified as of type ackRequested does not have an AckRequeted element");
+ }
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+
+ String sequenceID = ackRequested.getIdentifier().getIdentifier();
+
+ ConfigurationContext configurationContext = rmMsgCtx.getMessageContext().getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+
+ //Setting the ack depending on AcksTo.
+ SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceID,
+ Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+ EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
+ String acksToStr = acksTo.getAddress();
+
+ if (acksToStr == null)
+ throw new SandeshaException(
+ "acksToStr Seqeunce property is not set correctly");
+
+ AxisOperation ackOperation = null;
+
+ try {
+ ackOperation = AxisOperationFactory.getOperationDescription(AxisOperationFactory.MEP_URI_IN_ONLY);
+ } catch (AxisFault e) {
+ throw new SandeshaException("Could not create the Operation");
+ }
+
+ AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext()
+ .getAxisOperation();
+ if (rmMsgOperation != null) {
+ ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
+ if (outFlow != null) {
+ ackOperation.setPhasesOutFlow(outFlow);
+ ackOperation.setPhasesOutFaultFlow(outFlow);
+ }
+ }
+
+ MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
+ rmMsgCtx, ackOperation);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+ ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
+
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(msgContext.getEnvelope()));
+
+ //Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+ try {
+ ackMsgCtx.setEnvelope(envelope);
+ } catch (AxisFault e3) {
+ throw new SandeshaException(e3.getMessage());
+ }
+
+ ackMsgCtx.setTo(acksTo);
+ ackMsgCtx.setReplyTo(msgContext.getTo());
+ RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceID);
+
+ if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
+
+ AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
+ .getConfigurationContext());
+
+ //setting CONTEXT_WRITTEN since acksto is anonymous
+ if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
+ //operation context will be null when doing in a GLOBAL
+ // handler.
+ try {
+ AxisOperation op = AxisOperationFactory
+ .getAxisOperation(AxisOperationFactory.MEP_CONSTANT_IN_OUT);
+ OperationContext opCtx = new OperationContext(op);
+ rmMsgCtx.getMessageContext().setAxisOperation(op);
+ rmMsgCtx.getMessageContext().setOperationContext(opCtx);
+ } catch (AxisFault e2) {
+ throw new SandeshaException(e2.getMessage());
+ }
+ }
+
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+ org.apache.axis2.Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_TRUE);
+
+ rmMsgCtx.getMessageContext().setProperty(
+ Sandesha2Constants.ACK_WRITTEN, "true");
+ try {
+ engine.send(ackRMMsgCtx.getMessageContext());
+ } catch (AxisFault e1) {
+ throw new SandeshaException(e1.getMessage());
+ }
+ } else {
+
+ Transaction asyncAckTransaction = storageManager.getTransaction();
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager
+ .getRetransmitterBeanMgr();
+
+ String key = SandeshaUtil.getUUID();
+
+ //dumping to the storage will be done be Sandesha2 Transport Sender
+ //storageManager.storeMessageContext(key,ackMsgCtx);
+
+ SenderBean ackBean = new SenderBean();
+ ackBean.setMessageContextRefKey(key);
+ ackBean.setMessageID(ackMsgCtx.getMessageID());
+ ackBean.setReSend(false);
+
+ //this will be set to true in the sender.
+ ackBean.setSend(true);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ //the internalSequenceId value of the retransmitter Table for the
+ // messages related to an incoming
+ //sequence is the actual sequence ID
+
+// RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+// .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+
+// long ackInterval = PropertyManager.getInstance()
+// .getAcknowledgementInterval();
+
+ Parameter param = msgContext.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
+
+ SandeshaPropertyBean propertyBean = null;
+ if (param!=null) {
+ propertyBean = (SandeshaPropertyBean) param.getValue();
+ }else {
+ propertyBean = PropertyManager.getInstance().getPropertyBean();
+ }
+
+
+ long ackInterval = propertyBean.getAcknowledgementInaterval();
+
+ // if (policyBean != null) {
+// ackInterval = policyBean.getAcknowledgementInaterval();
+// }
+
+ //Ack will be sent as stand alone, only after the retransmitter
+ // interval.
+ long timeToSend = System.currentTimeMillis() + ackInterval;
+
+ //removing old acks.
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ //this will be set to true in the sandesha2TransportSender.
+ findBean.setSend(true);
+ findBean.setReSend(false);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+
+ if (it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ timeToSend = oldAckBean.getTimeToSend(); //If there is an old ack. This ack will be sent in the old timeToSend.
+ retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+ }
+
+ ackBean.setTimeToSend(timeToSend);
+
+ storageManager.storeMessageContext(key,ackMsgCtx);
+
+ //inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
+ asyncAckTransaction.commit();
+
+ //passing the message through sandesha2sender
+
+ ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
+ ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+
+ ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
+
+ AxisEngine engine = new AxisEngine (configurationContext);
+ try {
+ engine.send(ackMsgCtx);
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+
+ SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
+
+ msgContext.pause();
+ }
+ }
+
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Mar 6 18:39:01 2006
@@ -44,6 +44,7 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
import org.apache.sandesha2.transport.Sandesha2TransportSender;
+import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
@@ -71,6 +72,8 @@
throw new SandeshaException(message);
}
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
AbstractContext context = rmMsgCtx.getContext();
if (context == null) {
String message = "Context is null";
@@ -103,6 +106,35 @@
throw new SandeshaException(message);
}
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,outSequenceId);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
+ faultMessageContext = faultManager.checkForInvalidAcknowledgement(rmMsgCtx);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
//updating the last activated time of the sequence.
// Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
// SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Mar 6 18:39:01 2006
@@ -33,6 +33,7 @@
import org.apache.axis2.engine.AxisEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.AcknowledgementManager;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
@@ -48,6 +49,7 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
+import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.PropertyManager;
import org.apache.sandesha2.util.RMMsgCreator;
@@ -83,6 +85,9 @@
AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
ackProcessor.processMessage(rmMsgCtx);
}
+
+ //TODO process embedded ack requests
+
//Processing the application message.
MessageContext msgCtx = rmMsgCtx.getMessageContext();
@@ -100,7 +105,7 @@
return;
}
- //RM will not rend sync responses. If sync acks are there this will be
+ //RM will not send sync responses. If sync acks are there this will be
// made true again later.
if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
@@ -113,9 +118,24 @@
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForLastMsgNumberExceeded(rmMsgCtx);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
-
+
//setting acked msg no range
Sequence sequence = (Sequence) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -128,10 +148,43 @@
throw new SandeshaException(message);
}
+ faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,sequenceId);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.send(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
+
//setting mustUnderstand to false.
sequence.setMustUnderstand(false);
rmMsgCtx.addSOAPEnvelope();
+
+ //throwing a fault if the sequence is closed.
+ faultMessageContext = faultManager. checkForSequenceClosed(rmMsgCtx,sequenceId);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
+
+
Transaction lastUpdatedTimeTransaction = storageManager.getTransaction();
//updating the last activated time of the sequence.
@@ -306,197 +359,25 @@
LastMessage lastMessage = (LastMessage) sequence.getLastMessage();
- //Setting the ack depending on AcksTo.
- SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceId,
- Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
-
- EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
- String acksToStr = acksTo.getAddress();
-
- if (acksToStr == null || messagesStr == null)
- throw new SandeshaException(
- "Seqeunce properties are not set correctly");
-
- if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
- // send ack in the sync case, only if the last message or the
- // ackRequested tag is present.
- boolean ackRequired = false;
- if (ackRequested != null || lastMessage != null)
- ackRequired = true;
-
- if (!ackRequired) {
- return;
- }
- }
- AxisOperation ackOperation = null;
-
- try {
- ackOperation = AxisOperationFactory.getOperationDescription(AxisOperationFactory.MEP_URI_IN_ONLY);
+ if (lastMessage!=null) {
+ long messageNumber = sequence.getMessageNumber().getMessageNumber();
+ SequencePropertyBean lastMessageBean = new SequencePropertyBean ();
+ lastMessageBean.setSequenceID(sequenceId);
+ lastMessageBean.setName(Sandesha2Constants.SequenceProperties.LAST_MESSAGE);
+ lastMessageBean.setValue(new Long(messageNumber).toString());
+
+ seqPropMgr.insert(lastMessageBean);
+ }
+
+ RMMsgContext ackRMMessage = AcknowledgementManager.generateAckMessage(rmMsgCtx,sequenceId);
+
+ AxisEngine engine = new AxisEngine (configCtx);
+
+ try {
+ engine.send(ackRMMessage.getMessageContext());
} catch (AxisFault e) {
- throw new SandeshaException("Could not create the Operation");
- }
-
- AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext()
- .getAxisOperation();
- if (rmMsgOperation != null) {
- ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
- if (outFlow != null) {
- ackOperation.setPhasesOutFlow(outFlow);
- ackOperation.setPhasesOutFaultFlow(outFlow);
- }
- }
-
- MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
- rmMsgCtx, ackOperation);
-
- ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-
- RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
- ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
-
- ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
-
- //Setting new envelope
- SOAPEnvelope envelope = factory.getDefaultEnvelope();
- try {
- ackMsgCtx.setEnvelope(envelope);
- } catch (AxisFault e3) {
- throw new SandeshaException(e3.getMessage());
- }
-
- ackMsgCtx.setTo(acksTo);
- ackMsgCtx.setReplyTo(msgCtx.getTo());
- RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId);
-
- if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
-
- AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
- .getConfigurationContext());
-
- //setting CONTEXT_WRITTEN since acksto is anonymous
- if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
- //operation context will be null when doing in a GLOBAL
- // handler.
- try {
- AxisOperation op = AxisOperationFactory
- .getAxisOperation(AxisOperationFactory.MEP_CONSTANT_IN_OUT);
- OperationContext opCtx = new OperationContext(op);
- rmMsgCtx.getMessageContext().setAxisOperation(op);
- rmMsgCtx.getMessageContext().setOperationContext(opCtx);
- } catch (AxisFault e2) {
- throw new SandeshaException(e2.getMessage());
- }
- }
-
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN,
- Constants.VALUE_TRUE);
-
- rmMsgCtx.getMessageContext().setProperty(
- Sandesha2Constants.ACK_WRITTEN, "true");
- try {
- engine.send(ackRMMsgCtx.getMessageContext());
- } catch (AxisFault e1) {
- throw new SandeshaException(e1.getMessage());
- }
- } else {
-
- Transaction asyncAckTransaction = storageManager.getTransaction();
-
- SenderBeanMgr retransmitterBeanMgr = storageManager
- .getRetransmitterBeanMgr();
-
- String key = SandeshaUtil.getUUID();
-
- //dumping to the storage will be done be Sandesha2 Transport Sender
- //storageManager.storeMessageContext(key,ackMsgCtx);
-
- SenderBean ackBean = new SenderBean();
- ackBean.setMessageContextRefKey(key);
- ackBean.setMessageID(ackMsgCtx.getMessageID());
- ackBean.setReSend(false);
-
- //this will be set to true in the sender.
- ackBean.setSend(true);
-
- ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
- Sandesha2Constants.VALUE_FALSE);
-
- ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
- //the internalSequenceId value of the retransmitter Table for the
- // messages related to an incoming
- //sequence is the actual sequence ID
-
-// RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
-// .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-
-// long ackInterval = PropertyManager.getInstance()
-// .getAcknowledgementInterval();
-
- Parameter param = msgCtx.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
-
- SandeshaPropertyBean propertyBean = null;
- if (param!=null) {
- propertyBean = (SandeshaPropertyBean) param.getValue();
- }else {
- propertyBean = PropertyManager.getInstance().getPropertyBean();
- }
-
-
- long ackInterval = propertyBean.getAcknowledgementInaterval();
-
- // if (policyBean != null) {
-// ackInterval = policyBean.getAcknowledgementInaterval();
-// }
-
- //Ack will be sent as stand alone, only after the retransmitter
- // interval.
- long timeToSend = System.currentTimeMillis() + ackInterval;
-
- //removing old acks.
- SenderBean findBean = new SenderBean();
- findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
- //this will be set to true in the sandesha2TransportSender.
- findBean.setSend(true);
- findBean.setReSend(false);
- Collection coll = retransmitterBeanMgr.find(findBean);
- Iterator it = coll.iterator();
-
- if (it.hasNext()) {
- SenderBean oldAckBean = (SenderBean) it.next();
- timeToSend = oldAckBean.getTimeToSend(); //If there is an old ack. This ack will be sent in the old timeToSend.
- retransmitterBeanMgr.delete(oldAckBean.getMessageID());
- }
-
- ackBean.setTimeToSend(timeToSend);
-
- storageManager.storeMessageContext(key,ackMsgCtx);
-
- //inserting the new ack.
- retransmitterBeanMgr.insert(ackBean);
-
- asyncAckTransaction.commit();
-
- //passing the message through sandesha2sender
-
- ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
- ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-
- ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
-
- ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
-
- AxisEngine engine = new AxisEngine (configCtx);
- try {
- engine.send(ackMsgCtx);
- } catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
- }
-
- SandeshaUtil.startSenderForTheSequence(configCtx,sequenceId);
+ String message = "Exception thrown while trying to send the ack message";
+ throw new SandeshaException (message,e);
}
-
}
}
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=383751&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Mar 6 18:39:01 2006
@@ -0,0 +1,121 @@
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.CloseSequenceResponse;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
+
+public class CloseSequenceProcessor implements MsgProcessor {
+
+ public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ CloseSequence closeSequence = (CloseSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+ String sequenceID = closeSequence.getIdentifier().getIdentifier();
+
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(rmMsgCtx,sequenceID);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = msgCtx.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx);
+
+ Transaction closeSequenceTransaction = storageManager.getTransaction();
+
+ SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean sequenceClosedBean = new SequencePropertyBean ();
+ sequenceClosedBean.setSequenceID(sequenceID);
+ sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED);
+ sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
+
+ sequencePropMgr.insert(sequenceClosedBean);
+
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx,sequenceID);
+
+ MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+
+ String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
+ ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
+
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
+
+ //Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+ try {
+ ackMsgCtx.setEnvelope(envelope);
+ } catch (AxisFault e3) {
+ throw new SandeshaException(e3.getMessage());
+ }
+
+ //adding the ack part to the envelope.
+ SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+ MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
+
+ MessageContext closeSequenceResponseMsg = null;
+ closeSequenceResponseMsg = Utils.createOutMessageContext(closeSequenceMsg);
+
+ RMMsgContext closeSeqResponseRMMsg = RMMsgCreator
+ .createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg);
+
+ closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,sequenceAcknowledgement);
+
+ closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+ closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ closeSequenceResponseMsg.setResponseWritten(true);
+
+ closeSeqResponseRMMsg.addSOAPEnvelope();
+
+ AxisEngine engine = new AxisEngine (closeSequenceMsg.getConfigurationContext());
+
+ try {
+ engine.send(closeSequenceResponseMsg);
+ } catch (AxisFault e) {
+ String message = "Could not send the terminate sequence response";
+ throw new SandeshaException (message,e);
+ }
+
+
+ closeSequenceTransaction.commit();
+ }
+
+
+
+
+
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon Mar 6 18:39:01 2006
@@ -34,6 +34,7 @@
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.CreateSeqBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
@@ -64,6 +65,21 @@
throw new SandeshaException(message);
}
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForCreateSequenceRefused(createSeqMsg);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = createSeqMsg.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
+ }
+
MessageContext outMessage = null;
outMessage = Utils.createOutMessageContext(createSeqMsg);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon Mar 6 18:39:01 2006
@@ -64,7 +64,7 @@
public void processMessage(RMMsgContext createSeqResponseRMMsgCtx)
throws SandeshaException {
-
+
SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
.getSOAPVersion(createSeqResponseRMMsgCtx.getSOAPEnvelope()));
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessorFactory.java Mon Mar 6 18:39:01 2006
@@ -41,6 +41,10 @@
return new CreateSeqResponseMsgProcessor();
case (Sandesha2Constants.MessageTypes.ACK):
return new AcknowledgementProcessor();
+ case (Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE):
+ return new CloseSequenceProcessor ();
+ case (Sandesha2Constants.MessageTypes.ACK_REQUEST):
+ return new AckRequestedProcessor ();
default:
return null;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=383751&r1=383750&r2=383751&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Mar 6 18:39:01 2006
@@ -33,6 +33,7 @@
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
@@ -76,6 +77,21 @@
String message = "Invalid sequence id";
log.debug(message);
throw new SandeshaException (message);
+ }
+
+ FaultManager faultManager = new FaultManager();
+ RMMsgContext faultMessageContext = faultManager.checkForUnknownSequence(terminateSeqRMMsg,sequenceId);
+ if (faultMessageContext != null) {
+ ConfigurationContext configurationContext = terminateSeqMsg.getConfigurationContext();
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ try {
+ engine.sendFault(faultMessageContext.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not send the fault message",e);
+ }
+
+ return;
}
ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org