You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/03/17 06:02:01 UTC
svn commit: r386537 [1/2] - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ client/
client/reports/ handlers/ msgprocessors/ storage/beanmanagers/
storage/inmemory/ util/ workers/
Author: chamikara
Date: Thu Mar 16 21:01:58 2006
New Revision: 386537
URL: http://svn.apache.org/viewcvs?rev=386537&view=rev
Log:
Updates to support most of the WSRX scenarios 1.1 upto 2.3
Correnctions to the Policy handling code.
Removed:
webservices/sandesha/trunk/src/org/apache/sandesha2/client/SequenceReport.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryCreateSeqBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryNextMsgBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySequencePropertyBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Thu Mar 16 21:01:58 2006
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import org.apache.axis2.AxisFault;
@@ -46,6 +47,7 @@
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaPropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.ws.commons.soap.SOAPEnvelope;
@@ -196,7 +198,7 @@
ArrayList completedMsgList = null;
if (completedMessagesBean!=null) {
- completedMsgList = SandeshaUtil.getArrayListFromString(completedMessagesBean.getValue());
+ completedMsgList = SandeshaUtil.getArrayListFromMsgsString (completedMessagesBean.getValue());
} else {
String message = "Completed messages bean is null, for the sequence " + sequenceID;
throw new SandeshaException (message);
@@ -295,11 +297,7 @@
referenceRMMessage.getMessageContext().setProperty(
Sandesha2Constants.ACK_WRITTEN, "true");
-// try {
-// engine.send(ackRMMsgCtx.getMessageContext());
-// } catch (AxisFault e1) {
-// throw new SandeshaException(e1.getMessage());
-// }
+
return ackRMMsgCtx;
} else {
@@ -311,9 +309,6 @@
String key = SandeshaUtil.getUUID();
- //dumping to the storage will be done be Sandesha2 Transport Sender
- //storageManager.storeMessageContext(key,ackMsgCtx);
-
SenderBean ackBean = new SenderBean();
ackBean.setMessageContextRefKey(key);
ackBean.setMessageID(ackMsgCtx.getMessageID());
@@ -326,32 +321,7 @@
Sandesha2Constants.VALUE_FALSE);
ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-
- //the internalSequenceId value of the retransmitter Table for the
- // messages related to an incoming
- //sequence is the actual sequence ID
-
-// RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
-// .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
-
-// long ackInterval = PropertyManager.getInstance()
-// .getAcknowledgementInterval();
-
- Parameter param = referenceMsg.getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
-
- SandeshaPropertyBean propertyBean = null;
- if (param!=null) {
- propertyBean = (SandeshaPropertyBean) param.getValue();
- }else {
- propertyBean = PropertyManager.getInstance().getPropertyBean();
- }
-
-
- long ackInterval = propertyBean.getAcknowledgementInaterval();
-
- // if (policyBean != null) {
-// ackInterval = policyBean.getAcknowledgementInaterval();
-// }
+ long ackInterval = SandeshaUtil.getPropretyBean(referenceMsg).getAcknowledgementInaterval();
//Ack will be sent as stand alone, only after the retransmitter
// interval.
@@ -374,40 +344,50 @@
}
ackBean.setTimeToSend(timeToSend);
-
storageManager.storeMessageContext(key,ackMsgCtx);
//inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
-
asyncAckTransaction.commit();
//passing the message through sandesha2sender
-
ackMsgCtx.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,ackMsgCtx.getTransportOut());
ackMsgCtx.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
-
ackMsgCtx.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
-
ackMsgCtx.setTransportOut(new Sandesha2TransportOutDesc ());
-
-// AxisEngine engine = new AxisEngine (configurationContext);
-// try {
-// engine.send(ackMsgCtx);
-// } catch (AxisFault e) {
-// throw new SandeshaException (e.getMessage());
-// }
-
RMMsgContext ackRMMessageCtx = MsgInitializer.initializeMessage(ackMsgCtx);
-
- SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
-
+ SandeshaUtil.startSenderForTheSequence(configurationContext,sequenceID);
referenceMsg.pause();
-
return ackRMMessageCtx;
+ }
+ }
+
+ public static boolean verifySequenceCompletion(Iterator ackRangesIterator,
+ long lastMessageNo) {
+ HashMap startMap = new HashMap();
+
+ while (ackRangesIterator.hasNext()) {
+ AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator
+ .next();
+ startMap.put(new Long(temp.getLowerValue()), temp);
}
-
-
-
+
+ long start = 1;
+ boolean loop = true;
+ while (loop) {
+ AcknowledgementRange temp = (AcknowledgementRange) startMap
+ .get(new Long(start));
+ if (temp == null) {
+ loop = false;
+ continue;
+ }
+
+ if (temp.getUpperValue() >= lastMessageNo)
+ return true;
+
+ start = temp.getUpperValue() + 1;
+ }
+
+ return false;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Thu Mar 16 21:01:58 2006
@@ -77,7 +77,7 @@
String ACTION_TERMINATE_SEQUENCE_RESPONSE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/TerminateSequenceResponse";
- String ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200602/AckRequested";
+ String ACTION_ACK_REQUEST = "http://docs.oasis-open.org/ws-rx/wsrm/200510/AckRequested";
String ACTION_CLOSE_SEQUENCE = "http://docs.oasis-open.org/ws-rx/wsrm/200510/CloseSequence";
@@ -250,8 +250,6 @@
String NEXT_MESSAGE_NUMBER = "NextMsgNo";
- String LAST_OUT_MESSAGE = "LastOutMessage";
-
String INCOMING_SEQUENCE_LIST = "IncomingSequenceList";
String CHECK_RESPONSE = "CheckResponse";
@@ -280,9 +278,17 @@
String REQUEST_SIDE_SEQUENCE_ID = "RequestSideSequenceID"; //used only at the server side
- String HIGHEST_MSG_NO = "HighestMessageNumber";
+ String HIGHEST_IN_MSG_NUMBER = "HighestInMsgNumber";
+
+ String HIGHEST_IN_MSG_KEY = "HighestInMsgKey";
+
+ String HIGHEST_OUT_MSG_NUMBER = "HighestOutMsgNumber";
+
+ String HIGHEST_OUT_MSG_KEY = "HighestOutMsgKey";
+ String LAST_OUT_MESSAGE_NO = "LastOutMessage";
+ String LAST_IN_MESSAGE_NO = "LastInMessage";
}
public interface SOAPVersion {
@@ -443,8 +449,6 @@
String VALUE_FALSE = "false";
- String SANDESHA2_INTERNAL_SEQUENCE_ID = "Sandesha2IntSeq";
-
String MESSAGE_STORE_KEY = "Sandesha2MessageStoreKey";
String ORIGINAL_TRANSPORT_OUT_DESC = "Sandesha2OriginalTransportSender";
@@ -462,4 +466,9 @@
String INTERNAL_SEQUENCE_PREFIX = "Sandesha2InternalSequence";
String SANDESHA2_POLICY_BEAN = "Sandesha2PolicyBean";
+
+ String LIST_SEPERATOR = ",";
+
+ String LIST_PART_SEPERATOR = "-";
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Thu Mar 16 21:01:58 2006
@@ -169,10 +169,10 @@
}
private static boolean isRequiredForResponseSide (String name) {
- if (name==null && name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE))
+ if (name==null && name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
return false;
- if (name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE))
+ if (name.equals(Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO))
return false;
return false;
@@ -331,6 +331,234 @@
SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
}
+// public void terminateSequence (String outSequenceID,String internalSequenceID,ConfigurationContext configCtx) {
+//
+//
+// StorageManager storageManager = SandeshaUtil
+// .getSandeshaStorageManager(configCtx);
+//
+// Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+//
+// SequencePropertyBeanMgr seqPropMgr = storageManager
+// .getSequencePropretyBeanMgr();
+//
+// SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceID,
+// Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+//
+// if (terminated != null && terminated.getValue() != null
+// && "true".equals(terminated.getValue())) {
+// String message = "Terminate was added previously.";
+// log.info(message);
+// return;
+// }
+//
+// RMMsgContext terminateRMMessage = RMMsgCreator
+// .createTerminateSequenceMessage(outSequenceID);
+// terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+// terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+//
+// SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceID,
+// Sandesha2Constants.SequenceProperties.TO_EPR);
+//
+// EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+// if (toEPR == null) {
+// String message = "To EPR has an invalid value";
+// throw new SandeshaException(message);
+// }
+//
+// terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+// terminateRMMessage.setFrom(new EndpointReference(
+// Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+// terminateRMMessage.setFaultTo(new EndpointReference(
+// Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+//
+// String rmVersion = SandeshaUtil.getRMVersion(internalSequenceID,configCtx);
+// if (rmVersion==null)
+// throw new SandeshaException ("Cant find the rmVersion of the given message");
+// terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+// terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+//
+// SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+// if (transportToBean!=null) {
+// terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+// }
+//
+// try {
+// terminateRMMessage.addSOAPEnvelope();
+// } catch (AxisFault e) {
+// throw new SandeshaException(e.getMessage());
+// }
+//
+// String key = SandeshaUtil.getUUID();
+//
+// SenderBean terminateBean = new SenderBean();
+// terminateBean.setMessageContextRefKey(key);
+//
+//
+// storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
+//
+//
+// //Set a retransmitter lastSentTime so that terminate will be send with
+// // some delay.
+// //Otherwise this get send before return of the current request (ack).
+// //TODO: refine the terminate delay.
+// terminateBean.setTimeToSend(System.currentTimeMillis()
+// + Sandesha2Constants.TERMINATE_DELAY);
+//
+// terminateBean.setMessageID(terminateRMMessage.getMessageId());
+//
+// //this will be set to true at the sender.
+// terminateBean.setSend(true);
+//
+// terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+// Sandesha2Constants.VALUE_FALSE);
+//
+// terminateBean.setReSend(false);
+//
+// SenderBeanMgr retramsmitterMgr = storageManager
+// .getRetransmitterBeanMgr();
+//
+// retramsmitterMgr.insert(terminateBean);
+//
+// SequencePropertyBean terminateAdded = new SequencePropertyBean();
+// terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+// terminateAdded.setSequenceID(outSequenceID);
+// terminateAdded.setValue("true");
+//
+// seqPropMgr.insert(terminateAdded);
+//
+// //This should be dumped to the storage by the sender
+// TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
+// terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+// terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+// terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+// terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+// addTerminateSeqTransaction.commit();
+//
+// AxisEngine engine = new AxisEngine (incomingAckRMMsg.getMessageContext().getConfigurationContext());
+// try {
+// engine.send(terminateRMMessage.getMessageContext());
+// } catch (AxisFault e) {
+// throw new SandeshaException (e.getMessage());
+// }
+//
+// }
+
+ public static void addTerminateSequenceMessage(RMMsgContext referenceMessage,
+ String outSequenceId, String internalSequenceId)
+ throws SandeshaException {
+
+
+ ConfigurationContext configurationContext = referenceMessage.getMessageContext().getConfigurationContext();
+ StorageManager storageManager = SandeshaUtil
+ .getSandeshaStorageManager(configurationContext);
+
+ Transaction addTerminateSeqTransaction = storageManager.getTransaction();
+
+ SequencePropertyBeanMgr seqPropMgr = storageManager
+ .getSequencePropretyBeanMgr();
+
+ SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
+ Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+
+ if (terminated != null && terminated.getValue() != null
+ && "true".equals(terminated.getValue())) {
+ String message = "Terminate was added previously.";
+ log.info(message);
+// return;
+ }
+
+ RMMsgContext terminateRMMessage = RMMsgCreator
+ .createTerminateSequenceMessage(referenceMessage, outSequenceId,internalSequenceId);
+ terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
+ terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
+
+ SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
+ Sandesha2Constants.SequenceProperties.TO_EPR);
+
+ EndpointReference toEPR = new EndpointReference ( toBean.getValue());
+ if (toEPR == null) {
+ String message = "To EPR has an invalid value";
+ throw new SandeshaException(message);
+ }
+
+ terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+ terminateRMMessage.setFrom(new EndpointReference(
+ Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+ terminateRMMessage.setFaultTo(new EndpointReference(
+ Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
+
+ String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
+ if (rmVersion==null)
+ throw new SandeshaException ("Cant find the rmVersion of the given message");
+ terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
+ terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
+
+ SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
+ if (transportToBean!=null) {
+ terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
+ }
+
+ try {
+ terminateRMMessage.addSOAPEnvelope();
+ } catch (AxisFault e) {
+ throw new SandeshaException(e.getMessage());
+ }
+
+ String key = SandeshaUtil.getUUID();
+
+ SenderBean terminateBean = new SenderBean();
+ terminateBean.setMessageContextRefKey(key);
+
+
+ storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
+
+
+ //Set a retransmitter lastSentTime so that terminate will be send with
+ // some delay.
+ //Otherwise this get send before return of the current request (ack).
+ //TODO: refine the terminate delay.
+ terminateBean.setTimeToSend(System.currentTimeMillis()
+ + Sandesha2Constants.TERMINATE_DELAY);
+
+ terminateBean.setMessageID(terminateRMMessage.getMessageId());
+
+ //this will be set to true at the sender.
+ terminateBean.setSend(true);
+
+ terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
+ Sandesha2Constants.VALUE_FALSE);
+
+ terminateBean.setReSend(false);
+
+ SenderBeanMgr retramsmitterMgr = storageManager
+ .getRetransmitterBeanMgr();
+
+ retramsmitterMgr.insert(terminateBean);
+
+ SequencePropertyBean terminateAdded = new SequencePropertyBean();
+ terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
+ terminateAdded.setSequenceID(outSequenceId);
+ terminateAdded.setValue("true");
+
+ seqPropMgr.insert(terminateAdded);
+
+ //This should be dumped to the storage by the sender
+ TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
+ terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
+ terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
+ terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
+ terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
+ addTerminateSeqTransaction.commit();
+
+ AxisEngine engine = new AxisEngine (configurationContext);
+ try {
+ engine.send(terminateRMMessage.getMessageContext());
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/Sandesha2ClientAPI.java Thu Mar 16 21:01:58 2006
@@ -20,12 +20,18 @@
import java.util.Collection;
import java.util.Iterator;
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.AcknowledgementManager;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.SpecSpecificConstants;
import org.apache.sandesha2.client.reports.RMReport;
import org.apache.sandesha2.client.reports.SequenceReport;
import org.apache.sandesha2.storage.StorageManager;
@@ -38,6 +44,14 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.TerminateSequence;
+import org.apache.ws.commons.om.OMException;
+import org.apache.ws.commons.soap.SOAP12Constants;
+import org.apache.ws.commons.soap.SOAPEnvelope;
+import org.apache.ws.commons.soap.SOAPFactory;
+import org.apache.ws.commons.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.ws.commons.soap.impl.llom.soap12.SOAP12Factory;
/**
* Contains all the Sandesha2Constants of Sandesha2.
@@ -199,6 +213,8 @@
}
private static void fillOutgoingSequenceInfo (SequenceReport report,String outSequenceID, SequencePropertyBeanMgr seqPropMgr) throws SandeshaException {
+ report.setSequenceID(outSequenceID);
+
ArrayList completedMessageList = AcknowledgementManager.getClientCompletedMessagesList (outSequenceID,seqPropMgr);
Iterator iter = completedMessageList.iterator();
@@ -226,9 +242,8 @@
ArrayList completedMessageList = AcknowledgementManager.getServerCompletedMessagesList (sequenceID,seqPropMgr);
Iterator iter = completedMessageList.iterator();
- while (iter.hasNext()) {
- Long lng = new Long (Long.parseLong((String) iter.next()));
- sequenceReport.addCompletedMessage(lng);
+ while (iter.hasNext()) {;
+ sequenceReport.addCompletedMessage((Long) iter.next());
}
sequenceReport.setSequenceDirection(SequenceReport.SEQUENCE_DIRECTION_IN);
@@ -315,9 +330,75 @@
return rmReport;
}
- public String InternalSequenceID (String to, String sequenceKey) {
+ public static String getInternalSequenceID (String to, String sequenceKey) {
return SandeshaUtil.getInternalSequenceID(to,sequenceKey);
}
+ public static void terminateSequence (String toEPR, String sequenceKey, ServiceClient serviceClient,ConfigurationContext configurationContext) throws SandeshaException {
+
+ String internalSequenceID = SandeshaUtil.getInternalSequenceID(toEPR,sequenceKey);
+
+ SequenceReport sequenceReport = Sandesha2ClientAPI.getOutgoingSequenceReport(internalSequenceID,configurationContext);
+ if (sequenceReport==null)
+ throw new SandeshaException ("Cannot generate the sequence report for the given internalSequenceID");
+ if (sequenceReport.getSequenceStatus()!=SequenceReport.SEQUENCE_STATUS_ESTABLISHED)
+ throw new SandeshaException ("Canot terminate the sequence since it is not active");
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+ SequencePropertyBeanMgr seqPropMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean sequenceIDBean = seqPropMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
+ if (sequenceIDBean==null)
+ throw new SandeshaException ("SequenceIdBean is not set");
+
+ String sequenceID = sequenceIDBean.getValue();
+
+ if (sequenceID==null)
+ throw new SandeshaException ("Cannot find the sequenceID");
+
+ Options options = serviceClient.getOptions();
+
+ String rmSpecVersion = (String) options.getProperty(Sandesha2ClientAPI.RM_SPEC_VERSION);
+
+ if (rmSpecVersion==null)
+ rmSpecVersion = SpecSpecificConstants.getDefaultSpecVersion ();
+
+ String oldAction = options.getAction();
+
+ options.setAction(SpecSpecificConstants.getTerminateSequenceAction(rmSpecVersion));
+
+ SOAPEnvelope dummyEnvelope = null;
+ SOAPFactory factory = null;
+ String soapNamespaceURI = options.getSoapVersionURI();
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(soapNamespaceURI)) {
+ factory = new SOAP12Factory ();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ }else {
+ factory = new SOAP11Factory ();
+ dummyEnvelope = factory.getDefaultEnvelope();
+ }
+
+ String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmSpecVersion);
+
+ TerminateSequence terminateSequence = new TerminateSequence (factory,rmNamespaceValue);
+ Identifier identifier = new Identifier (factory,rmNamespaceValue);
+ identifier.setIndentifer(sequenceID);
+ terminateSequence.setIdentifier(identifier);
+
+ terminateSequence.toSOAPEnvelope(dummyEnvelope);
+
+ String oldSequenceKey = (String) options.getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
+ options.setProperty(Sandesha2ClientAPI.SEQUENCE_KEY,sequenceKey);
+ try {
+ serviceClient.fireAndForget(dummyEnvelope.getBody().getFirstChildWithName(new QName (rmNamespaceValue,Sandesha2Constants.WSRM_COMMON.TERMINATE_SEQUENCE)));
+ } catch (AxisFault e) {
+ throw new SandeshaException ("Could not invoke the service client", e);
+ }
+
+ if (oldSequenceKey!=null)
+ options.setProperty(Sandesha2ClientAPI.SEQUENCE_KEY,oldSequenceKey);
+
+ options.setAction(oldAction);
+ }
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/client/reports/RMReport.java Thu Mar 16 21:01:58 2006
@@ -84,7 +84,7 @@
}
public void addToOutgoingInternalSequenceMap (String outSequenceID, String internalSequenceID) {
- outgoingInternalSequenceIDMap.put(outSequenceID,outSequenceID);
+ outgoingInternalSequenceIDMap.put(outSequenceID,internalSequenceID);
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Thu Mar 16 21:01:58 2006
@@ -84,16 +84,9 @@
MsgProcessor msgProcessor = MsgProcessorFactory
.getMessageProcessor(rmMsgCtx.getMessageType());
- if (msgProcessor == null) {
-// String message = "An Invalid RM message was received. Sandesha2 cant forward this request";
-// log.debug(message);
-// throw new AxisFault(message);
-
- return; //this is not a rm message
- }
-
try {
- msgProcessor.processMessage(rmMsgCtx);
+ if (msgProcessor!=null)
+ msgProcessor.processInMessage(rmMsgCtx);
} catch (SandeshaException se) {
String message = "Error in processing the message";
log.debug(message);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Thu Mar 16 21:01:58 2006
@@ -41,6 +41,9 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.SpecSpecificConstants;
import org.apache.sandesha2.client.Sandesha2ClientAPI;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
@@ -114,755 +117,30 @@
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(context);
- ServiceContext serviceContext = msgCtx.getServiceContext();
- OperationContext operationContext = msgCtx.getOperationContext();
- // continue only if an possible application message
- if (!(rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.UNKNOWN)) {
- return;
- }
-
- MessageContext requestMessageCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- if (requestMessageCtx!=null) {
- RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMessageCtx);
- Sequence reqSeqPart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (reqSeqPart==null) {
- //this is not a rm intended message
- return;
- }
- }
-
- Parameter policyParam = msgCtx
- .getParameter(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
- if (policyParam == null) {
- SandeshaPropertyBean propertyBean = PropertyManager.getInstance()
- .getPropertyBean();
- Parameter parameter = new Parameter();
- parameter.setName(Sandesha2Constants.SANDESHA2_POLICY_BEAN);
- parameter.setValue(propertyBean);
-
- // TODO this should be addede to the AxisMessage
- if (msgCtx.getAxisOperation() != null)
- msgCtx.getAxisOperation().addParameter(parameter);
- else if (msgCtx.getAxisService() != null)
- msgCtx.getAxisService().addParameter(parameter);
- }
-
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
-
- Transaction outHandlerTransaction = storageManager.getTransaction();
-
- boolean serverSide = msgCtx.isServerSide();
-
- // setting message Id if null
- if (msgCtx.getMessageID() == null) {
- msgCtx.setMessageID(SandeshaUtil.getUUID());
- }
-
- // find internal sequence id
- String internalSequenceId = null;
-
- /* Internal sequence id is the one used to refer to the sequence (since
- actual sequence id is not available when first msg arrives)
- server side - sequenceId if the incoming sequence
- client side - wsaTo + SeequenceKey */
-
- if (serverSide) {
- // getting the request message and rmMessage.
- MessageContext reqMsgCtx = msgCtx.getOperationContext()
- .getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-
- RMMsgContext requestRMMsgCtx = MsgInitializer
- .initializeMessage(reqMsgCtx);
-
- Sequence reqSequence = (Sequence) requestRMMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (reqSequence == null) {
- String message = "Sequence part is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String incomingSeqId = reqSequence.getIdentifier().getIdentifier();
- if (incomingSeqId == null || incomingSeqId == "") {
- String message = "Invalid seqence Id";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- internalSequenceId = SandeshaUtil
- .getServerSideInternalSeqIdFromIncomingSeqId(incomingSeqId);
-
- } else {
- // set the internal sequence id for the client side.
- EndpointReference toEPR = msgCtx.getTo();
- if (toEPR == null || toEPR.getAddress() == null
- || "".equals(toEPR.getAddress())) {
- String message = "TO End Point Reference is not set correctly. This is a must for the sandesha client side.";
- log.debug(message);
- throw new AxisFault(message);
- }
-
- String to = toEPR.getAddress();
- String sequenceKey = (String) msgCtx
- .getProperty(Sandesha2ClientAPI.SEQUENCE_KEY);
-
- internalSequenceId = SandeshaUtil.getInternalSequenceID(to,
- sequenceKey);
-
- }
-
- /* checking weather the user has given the messageNumber (most of the cases this will not be the case).
- In that case the system will generate the message number */
-
- //User should set it as a long object.
- Long messageNumberLng = (Long) msgCtx.getProperty(Sandesha2ClientAPI.MESSAGE_NUMBER);
-
- long givenMessageNumber = -1;
- if (messageNumberLng!=null) {
- givenMessageNumber = messageNumberLng.longValue();
- if (givenMessageNumber<=0) {
- throw new SandeshaException ("The givem message number value is invalid (has to be larger than zero)");
- }
- }
-
- //the message number that was last used.
- long systemMessageNumber = getPreviousMsgNo(context, internalSequenceId);
-
- //The number given by the user has to be larger than the last stored number.
- if (givenMessageNumber>0 && givenMessageNumber<=systemMessageNumber) {
- String message = "The given message number is not larger than value of the last sent message.";
- throw new SandeshaException (message);
- }
-
- //Finding the correct message number.
- long messageNumber = -1;
- if (givenMessageNumber>0) // if given message number is valid use it. (this is larger than the last stored due to the last check)
- messageNumber = givenMessageNumber;
- else if (systemMessageNumber>0) { //if system message number is valid use it.
- messageNumber = systemMessageNumber+1;
- } else { //This is the fist message (systemMessageNumber = -1)
- messageNumber = 1;
- }
-
- //saving the used message number
- if (!dummyMessage)
- setNextMsgNo(context,internalSequenceId,messageNumber);
-
- boolean sendCreateSequence = false;
- SequencePropertyBean outSeqBean = seqPropMgr.retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
- // setting async ack endpoint for the server side. (if present)
- if (serverSide) {
- String incomingSequenceID = SandeshaUtil
- .getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
- SequencePropertyBean incomingToBean = seqPropMgr.retrieve(
- incomingSequenceID,
- Sandesha2Constants.SequenceProperties.TO_EPR);
- if (incomingToBean != null) {
- String incomingTo = incomingToBean.getValue();
- msgCtx.setProperty(Sandesha2ClientAPI.AcksTo, incomingTo);
- }
- }
-
-
- String specVersion = null;
-
- if (msgCtx.isServerSide()) {
- //in the server side, get the RM version from the request sequence.
- MessageContext requestMessageContext = msgCtx.getOperationContext().getMessageContext(AxisOperationFactory.MESSAGE_LABEL_IN_VALUE);
- if (requestMessageContext==null)
- throw new SandeshaException ("Request message context is null, cant find out the request side sequenceID");
-
- RMMsgContext requestRMMsgCtx = MsgInitializer.initializeMessage(requestMessageContext);
- Sequence sequence = (Sequence) requestRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
- String requestSequenceID = sequence.getIdentifier().getIdentifier();
- SequencePropertyBean specVersionBean = seqPropMgr.retrieve(requestSequenceID,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
- if (specVersionBean==null)
- throw new SandeshaException ("SpecVersion sequence property bean is not available for the incoming sequence. Cant find the RM version for outgoing side");
-
- specVersion = specVersionBean.getValue();
-
-
- } else {
- //in the client side, user will set the RM version.
- specVersion = (String) msgCtx.getProperty(Sandesha2ClientAPI.RM_SPEC_VERSION);
- }
-
-
- if (specVersion==null) {
- specVersion = Sandesha2Constants.SPEC_VERSIONS.WSRM; //TODO change the default to WSRX.
+ MsgProcessor msgProcessor = null;
+ int messageType = rmMsgCtx.getMessageType();
+ if (messageType==Sandesha2Constants.MessageTypes.UNKNOWN) {
+ MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMsgCtx!=null) { //for the server side
+ RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+ Sequence sequencePart = (Sequence) reqRMMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequencePart!=null)
+ msgProcessor = new ApplicationMsgProcessor ();// a rm intended message.
+ } else if (!msgCtx.isServerSide()) //if client side.
+ msgProcessor = new ApplicationMsgProcessor ();
+ }else {
+ msgProcessor = MsgProcessorFactory.getMessageProcessor(messageType);
}
-
- if (messageNumber == 1) {
- if (outSeqBean == null) {
- sendCreateSequence = true; // message number being one and not having an out sequence, implies that a create sequence has to be send.
- }
-
- // if fist message - setup the sending side sequence - both for the server and the client sides
- SequenceManager.setupNewClientSequence(msgCtx, internalSequenceId,specVersion);
- }
-
- if (sendCreateSequence) {
- SequencePropertyBean responseCreateSeqAdded = seqPropMgr
- .retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT);
-
- if (responseCreateSeqAdded == null) {
- responseCreateSeqAdded = new SequencePropertyBean(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.OUT_CREATE_SEQUENCE_SENT,
- "true");
- seqPropMgr.insert(responseCreateSeqAdded);
-
- String acksTo = null;
- if (serviceContext != null) {
- acksTo = (String) msgCtx
- .getProperty(Sandesha2ClientAPI.AcksTo);
- }
-
- if (msgCtx.isServerSide()) {
- // we do not set acksTo value to anonymous when the create
- // sequence is send from the server.
- MessageContext requestMessage = operationContext
- .getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- if (requestMessage == null) {
- String message = "Request message is not present";
- log.debug(message);
- throw new SandeshaException(message);
- }
- acksTo = requestMessage.getTo().getAddress();
-
- } else {
- if (acksTo == null)
- acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
- }
-
- if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo) && !serverSide) {
- String transportIn = (String) context
- .getProperty(MessageContext.TRANSPORT_IN);
- if (transportIn == null)
- transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
- } else if (acksTo == null && serverSide) {
- String incomingSequencId = SandeshaUtil
- .getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
- SequencePropertyBean bean = seqPropMgr.retrieve(
- incomingSequencId,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
- if (bean != null) {
- EndpointReference acksToEPR = new EndpointReference(bean.getValue());
- if (acksToEPR != null)
- acksTo = (String) acksToEPR.getAddress();
- }
- } else if (Sandesha2Constants.WSA.NS_URI_ANONYMOUS
- .equals(acksTo)) {
- // set transport in.
- Object trIn = msgCtx
- .getProperty(MessageContext.TRANSPORT_IN);
- if (trIn == null) {
- //TODO
- }
- }
-
- addCreateSequenceMessage(rmMsgCtx, internalSequenceId, acksTo);
- }
- }
-
- SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
- if (env == null) {
- SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
- SandeshaUtil.getSOAPVersion(env)).getDefaultEnvelope();
- rmMsgCtx.setSOAPEnvelop(envelope);
- }
-
- SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
- if (soapBody == null) {
- String message = "Invalid SOAP message. Body is not present";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String messageId1 = SandeshaUtil.getUUID();
- if (rmMsgCtx.getMessageId() == null) {
- rmMsgCtx.setMessageId(messageId1);
- }
-
-
-
-
-
-
-
- if (serverSide) {
- // let the request end with 202 if a ack has not been
- // written in the incoming thread.
-
- MessageContext reqMsgCtx = msgCtx.getOperationContext()
- .getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-
- if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
- || !"true".equals(reqMsgCtx
- .getProperty(Sandesha2Constants.ACK_WRITTEN)))
- reqMsgCtx.getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
- }
-
-
- EndpointReference toEPR = msgCtx.getTo();
- if (toEPR == null) {
- String message = "To EPR is not found";
- log.debug(message);
- throw new SandeshaException(message);
- }
- String to = toEPR.getAddress();
-
- String operationName = msgCtx.getOperationContext().getAxisOperation().getName().getLocalPart();
-
- if (msgCtx.getWSAAction() == null) {
- msgCtx.setWSAAction(to + "/" + operationName);
- }
-
- if (msgCtx.getSoapAction() == null) {
- msgCtx.setSoapAction("\"" + to + "/" + operationName + "\"");
- }
-
- // processing the response
- if (!dummyMessage)
- processResponseMessage(rmMsgCtx, internalSequenceId, messageNumber);
-
- msgCtx.pause(); // the execution will be stopped.
- outHandlerTransaction.commit();
- }
-
- public void addCreateSequenceMessage(RMMsgContext applicationRMMsg,
- String internalSequenceId, String acksTo) throws SandeshaException {
-
- MessageContext applicationMsg = applicationRMMsg.getMessageContext();
- if (applicationMsg == null) {
- String message = "Message context is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(
- applicationRMMsg, internalSequenceId, acksTo);
- createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
- CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
- .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
- if (createSequencePart == null) {
- String message = "Create Sequence part is null for a CreateSequence message";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(applicationMsg
- .getConfigurationContext());
-
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
-
- SequenceOffer offer = createSequencePart.getSequenceOffer();
- if (offer != null) {
- String offeredSequenceId = offer.getIdentifer().getIdentifier();
- SequencePropertyBean msgsBean = new SequencePropertyBean();
- msgsBean.setSequenceID(offeredSequenceId);
- msgsBean.setName(Sandesha2Constants.SequenceProperties.CLIENT_COMPLETED_MESSAGES);
- msgsBean.setValue("");
-
- SequencePropertyBean offeredSequenceBean = new SequencePropertyBean();
- offeredSequenceBean
- .setName(Sandesha2Constants.SequenceProperties.OFFERED_SEQUENCE);
- offeredSequenceBean.setSequenceID(internalSequenceId);
- offeredSequenceBean.setValue(offeredSequenceId);
-
- seqPropMgr.insert(msgsBean);
- seqPropMgr.insert(offeredSequenceBean);
- }
-
- MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
- createSeqMsg.setRelatesTo(null); // create seq msg does not relateTo anything
- AbstractContext context = applicationRMMsg.getContext();
- if (context == null) {
- String message = "Context is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- CreateSeqBeanMgr createSeqMgr = storageManager.getCreateSeqBeanMgr();
-
- CreateSeqBean createSeqBean = new CreateSeqBean(internalSequenceId,
- createSeqMsg.getMessageID(), null);
- createSeqMgr.insert(createSeqBean);
-
- if (createSeqMsg.getReplyTo() == null)
- createSeqMsg.setReplyTo(new EndpointReference(
- Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
-
- SenderBeanMgr retransmitterMgr = storageManager
- .getRetransmitterBeanMgr();
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean createSeqEntry = new SenderBean();
- createSeqEntry.setMessageContextRefKey(key);
- createSeqEntry.setTimeToSend(System.currentTimeMillis());
- createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
-
- // this will be set to true in the sender
- createSeqEntry.setSend(true);
-
- createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
- Sandesha2Constants.VALUE_FALSE);
-
- createSeqEntry
- .setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
- retransmitterMgr.insert(createSeqEntry);
-
- storageManager.storeMessageContext(key,createSeqMsg);
-
- // sending the message once through our sender.
- AxisEngine engine = new AxisEngine(createSeqMsg
- .getConfigurationContext());
-
- // message will be stored in the Sandesha2TransportSender
- createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
-
- TransportOutDescription transportOut = createSeqMsg.getTransportOut();
-
- createSeqMsg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
- createSeqMsg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
- createSeqMsg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, key);
-
- Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
- createSeqMsg.setTransportOut(sandesha2TransportOutDesc);
-
- try {
- log.info ("Sending create seq msg...");
- engine.send(createSeqMsg);
- } catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
- }
- }
-
- private void processResponseMessage(RMMsgContext rmMsg,
- String internalSequenceId, long messageNumber)
- throws SandeshaException {
-
- MessageContext msg = rmMsg.getMessageContext();
-
- SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
- .getSOAPVersion(rmMsg.getSOAPEnvelope()));
-
- if (rmMsg == null) {
- String message = "Message or reques message is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- AbstractContext context = rmMsg.getContext();
- if (context == null) {
- String message = "Context is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
- ConfigurationContext configurationContext = rmMsg.getMessageContext().getConfigurationContext();
-
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(msg.getConfigurationContext());
-
- SequencePropertyBeanMgr sequencePropertyMgr = storageManager
- .getSequencePropretyBeanMgr();
-
- SenderBeanMgr retransmitterMgr = storageManager
- .getRetransmitterBeanMgr();
-
- SequencePropertyBean toBean = sequencePropertyMgr.retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.TO_EPR);
- SequencePropertyBean replyToBean = sequencePropertyMgr.retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR);
-
- // again - looks weird in the client side - but consistent
- SequencePropertyBean outSequenceBean = sequencePropertyMgr.retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.OUT_SEQUENCE_ID);
-
- if (toBean == null) {
- String message = "To is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- EndpointReference toEPR = new EndpointReference(toBean.getValue());
- EndpointReference replyToEPR = null;
-
- if (replyToBean != null) {
- replyToEPR = new EndpointReference(replyToBean.getValue());
- }
-
- if (toEPR == null || toEPR.getAddress() == null
- || toEPR.getAddress() == "") {
- String message = "To Property has an invalid value";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
- String newToStr = null;
-
- if (msg.isServerSide()) {
- try {
- MessageContext requestMsg = msg.getOperationContext()
- .getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
- if (requestMsg != null) {
- newToStr = requestMsg.getReplyTo().getAddress();
- }
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
- }
-
- if (newToStr != null)
- rmMsg.setTo(new EndpointReference(newToStr));
- else
- rmMsg.setTo(toEPR);
-
- if (replyToEPR != null)
- rmMsg.setReplyTo(replyToEPR);
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
- if (rmVersion==null)
- throw new SandeshaException ("Cant find the rmVersion of the given message");
-
- String rmNamespaceValue = SpecSpecificConstants.getRMNamespaceValue(rmVersion);
-
- Sequence sequence = new Sequence(factory,rmNamespaceValue);
-
- MessageNumber msgNumber = new MessageNumber(factory,rmNamespaceValue);
- msgNumber.setMessageNumber(messageNumber);
- sequence.setMessageNumber(msgNumber);
-
- boolean lastMessage = false;
- // setting last message
- if (msg.isServerSide()) {
- // server side
- String incomingSeqId = internalSequenceId;
- MessageContext requestMsg = null;
-
- try {
- requestMsg = msg.getOperationContext().getMessageContext(
- WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
-
- RMMsgContext reqRMMsgCtx = MsgInitializer
- .initializeMessage(requestMsg);
- Sequence requestSequence = (Sequence) reqRMMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (requestSequence == null) {
- String message = "Request Sequence is null";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
-
- //TODO check for highest msg no.
- long requestMsgNo = requestSequence.getMessageNumber().getMessageNumber();
-
-
- if (requestSequence.getLastMessage() != null) {
- lastMessage = true;
- sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
-
- // saving the last message no.
- SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE,
- new Long(messageNumber).toString());
- sequencePropertyMgr.insert(lastOutMsgBean);
- }
-
- } else {
- // client side
-
- OperationContext operationContext = msg.getOperationContext();
- if (operationContext != null) {
- Object obj = msg.getProperty(Sandesha2ClientAPI.LAST_MESSAGE);
- if (obj != null && "true".equals(obj)) {
- lastMessage = true;
-
- SequencePropertyBean specVersionBean = sequencePropertyMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.RM_SPEC_VERSION);
- if (specVersionBean==null)
- throw new SandeshaException ("Spec version bean is not set");
- String specVersion = specVersionBean.getValue();
-
- if (SpecSpecificConstants.isLastMessageIndicatorRequired(specVersion))
- sequence.setLastMessage(new LastMessage(factory,rmNamespaceValue));
-
- // saving the last message no.
- SequencePropertyBean lastOutMsgBean = new SequencePropertyBean(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE,
- new Long(messageNumber).toString());
- sequencePropertyMgr.insert(lastOutMsgBean);
- }
- }
- }
-
- AckRequested ackRequested = null;
-
- boolean addAckRequested = false;
- //if (!lastMessage)
- addAckRequested = true; //TODO decide the policy to add the ackRequested tag
-
- // setting the Sequnece id.
- // Set send = true/false depending on the availability of the out
- // sequence id.
- String identifierStr = null;
- if (outSequenceBean == null || outSequenceBean.getValue() == null) {
- identifierStr = Sandesha2Constants.TEMP_SEQUENCE_ID;
-
- } else {
- identifierStr = (String) outSequenceBean.getValue();
- }
-
- Identifier id1 = new Identifier(factory,rmNamespaceValue);
- id1.setIndentifer(identifierStr);
- sequence.setIdentifier(id1);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQUENCE,sequence);
-
- if (addAckRequested) {
- ackRequested = new AckRequested(factory,rmNamespaceValue);
- Identifier id2 = new Identifier(factory,rmNamespaceValue);
- id2.setIndentifer(identifierStr);
- ackRequested.setIdentifier(id2);
- rmMsg.setMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST,
- ackRequested);
- }
-
- try {
- rmMsg.addSOAPEnvelope();
- } catch (AxisFault e1) {
- throw new SandeshaException(e1.getMessage());
- }
-
- //Retransmitter bean entry for the application message
- SenderBean appMsgEntry = new SenderBean();
- String storageKey = SandeshaUtil.getUUID();
-
- appMsgEntry.setMessageContextRefKey(storageKey);
-
- appMsgEntry.setTimeToSend(System.currentTimeMillis());
- appMsgEntry.setMessageID(rmMsg.getMessageId());
- appMsgEntry.setMessageNumber(messageNumber);
- appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
- if (outSequenceBean == null || outSequenceBean.getValue() == null) {
- appMsgEntry.setSend(false);
- } else {
- appMsgEntry.setSend(true);
- // Send will be set to true at the sender.
- msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,
- Sandesha2Constants.VALUE_TRUE);
- }
-
- appMsgEntry.setInternalSequenceID(internalSequenceId);
-
- storageManager.storeMessageContext(storageKey,msg);
-
- retransmitterMgr.insert(appMsgEntry);
-
- msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
- Sandesha2Constants.VALUE_FALSE);
-
- // changing the sender. This will set send to true.
- TransportSender sender = msg.getTransportOut().getSender();
-
- if (sender != null) {
- Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc ();
- msg.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
- msg.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,
- msg.getTransportOut());
- msg.setTransportOut(sandesha2TransportOutDesc);
-
- }
-
-
- //increasing the current handler index, so that the message will not be going throught the SandeshaOutHandler again.
- msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex()+1);
-
- //sending the message through, other handlers and the Sandesha2TransportSender so that it get dumped to the storage.
- AxisEngine engine = new AxisEngine (msg.getConfigurationContext());
- try {
- engine.resumeSend(msg);
- } catch (AxisFault e) {
- throw new SandeshaException (e);
- }
-
- }
+ if (msgProcessor!=null)
+ msgProcessor.processOutMessage(rmMsgCtx);
- private long getPreviousMsgNo(ConfigurationContext context,
- String internalSequenceId) throws SandeshaException {
-
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
-
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
- SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-
- long nextMsgNo = -1;
- if (nextMsgNoBean != null) {
- Long nextMsgNoLng = new Long(nextMsgNoBean.getValue());
- nextMsgNo = nextMsgNoLng.longValue();
- }
-
- return nextMsgNo;
}
-
- private void setNextMsgNo(ConfigurationContext context,
- String internalSequenceId, long msgNo) throws SandeshaException {
-
- if (msgNo<=0) {
- String message = "Message number '" + msgNo + "' is invalid. Has to be larger than zero.";
- throw new SandeshaException (message);
- }
-
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
-
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
- SequencePropertyBean nextMsgNoBean = seqPropMgr.retrieve(
- internalSequenceId,
- Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
-
- boolean update = true;
- if (nextMsgNoBean == null) {
- update = false;
- nextMsgNoBean = new SequencePropertyBean();
- nextMsgNoBean.setSequenceID(internalSequenceId);
- nextMsgNoBean.setName(Sandesha2Constants.SequenceProperties.NEXT_MESSAGE_NUMBER);
- }
- nextMsgNoBean.setValue(new Long(msgNo).toString());
- if (update)
- seqPropMgr.update(nextMsgNoBean);
- else
- seqPropMgr.insert(nextMsgNoBean);
+
- }
public QName getName() {
return new QName(Sandesha2Constants.OUT_HANDLER_NAME);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Thu Mar 16 21:01:58 2006
@@ -62,7 +62,7 @@
private Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
AckRequested ackRequested = (AckRequested) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.ACK_REQUEST);
@@ -263,6 +263,10 @@
msgContext.pause();
}
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=386537&r1=386536&r2=386537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Thu Mar 16 21:01:58 2006
@@ -32,10 +32,12 @@
import org.apache.axis2.transport.TransportSender;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.AcknowledgementManager;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.SpecSpecificConstants;
+import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -62,7 +64,7 @@
Log log = LogFactory.getLog(getClass());
- public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public void processInMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
.getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
@@ -73,6 +75,7 @@
}
MessageContext msgCtx = rmMsgCtx.getMessageContext();
+ ConfigurationContext configCtx = msgCtx.getConfigurationContext();
AbstractContext context = rmMsgCtx.getContext();
if (context == null) {
@@ -231,41 +234,48 @@
seqPropMgr.update(allCompletedMsgsBean);
//If all messages up to last message have been acknowledged. Add terminate Sequence message.
- SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
- internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE);
- if (lastOutMsgBean != null) {
- Long lastOutMsgNoLng = new Long (lastOutMsgBean.getValue());
- if (lastOutMsgNoLng == null) {
- String message = "Invalid object set for the Last Out Message";
- log.debug(message);
- throw new SandeshaException(message);
+// SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
+// internalSequenceId, Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE);
+// if (lastOutMsgBean != null) {
+// Long lastOutMsgNoLng = new Long (lastOutMsgBean.getValue());
+// if (lastOutMsgNoLng == null) {
+// String message = "Invalid object set for the Last Out Message";
+// log.debug(message);
+// throw new SandeshaException(message);
+// }
+//
+// long lastOutMessageNo = lastOutMsgNoLng.longValue();
+// if (lastOutMessageNo <= 0) {
+// String message = "Invalid value set for the last out message";
+// log.debug(message);
+// throw new SandeshaException(message);
+// }
+
+
+
+ //commiting transaction
+ ackTransaction.commit();
+
+ String lastOutMsgNoStr = SandeshaUtil.getSequenceProperty(internalSequenceId,Sandesha2Constants.SequenceProperties.LAST_OUT_MESSAGE_NO,configCtx);
+ if (lastOutMsgNoStr!=null ) {
+ long highestOutMsgNo = 0;
+ if (lastOutMsgNoStr!=null) {
+ highestOutMsgNo = Long.parseLong(lastOutMsgNoStr);
}
- long lastOutMessageNo = lastOutMsgNoLng.longValue();
- if (lastOutMessageNo <= 0) {
- String message = "Invalid value set for the last out message";
- log.debug(message);
- throw new SandeshaException(message);
- }
-
-
- //commiting transaction
- ackTransaction.commit();
-
- boolean complete = SandeshaUtil.verifySequenceCompletion(
- sequenceAck.getAcknowledgementRanges().iterator(),
- lastOutMessageNo);
+ if (highestOutMsgNo>0) {
+ boolean complete = AcknowledgementManager.verifySequenceCompletion (
+ sequenceAck.getAcknowledgementRanges().iterator(),highestOutMsgNo);
- if (complete) {
- addTerminateSequenceMessage(rmMsgCtx, outSequenceId,internalSequenceId);
+ if (complete)
+ TerminateManager.addTerminateSequenceMessage(rmMsgCtx, outSequenceId,internalSequenceId);
}
}
-
+
//stopping the progress of the message further.
rmMsgCtx.pause();
-
-
}
+
private SenderBean getRetransmitterEntry(Collection collection,
long msgNo) {
@@ -279,121 +289,7 @@
return null;
}
- public void addTerminateSequenceMessage(RMMsgContext incomingAckRMMsg,
- String outSequenceId, String internalSequenceId)
- throws SandeshaException {
-
-
- ConfigurationContext configurationContext = incomingAckRMMsg.getMessageContext().getConfigurationContext();
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(configurationContext);
-
- Transaction addTerminateSeqTransaction = storageManager.getTransaction();
-
- SequencePropertyBeanMgr seqPropMgr = storageManager
- .getSequencePropretyBeanMgr();
-
- SequencePropertyBean terminated = seqPropMgr.retrieve(outSequenceId,
- Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
-
- if (terminated != null && terminated.getValue() != null
- && "true".equals(terminated.getValue())) {
- String message = "Terminate was added previously.";
- log.info(message);
- return;
- }
-
- RMMsgContext terminateRMMessage = RMMsgCreator
- .createTerminateSequenceMessage(incomingAckRMMsg, outSequenceId,internalSequenceId);
- terminateRMMessage.setFlow(MessageContext.OUT_FLOW);
- terminateRMMessage.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-
- SequencePropertyBean toBean = seqPropMgr.retrieve(internalSequenceId,
- Sandesha2Constants.SequenceProperties.TO_EPR);
-
- EndpointReference toEPR = new EndpointReference ( toBean.getValue());
- if (toEPR == null) {
- String message = "To EPR has an invalid value";
- throw new SandeshaException(message);
- }
-
- terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
- terminateRMMessage.setFrom(new EndpointReference(
- Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
- terminateRMMessage.setFaultTo(new EndpointReference(
- Sandesha2Constants.WSA.NS_URI_ANONYMOUS));
-
- String rmVersion = SandeshaUtil.getRMVersion(internalSequenceId,configurationContext);
- if (rmVersion==null)
- throw new SandeshaException ("Cant find the rmVersion of the given message");
- terminateRMMessage.setWSAAction(SpecSpecificConstants.getTerminateSequenceAction(rmVersion));
- terminateRMMessage.setSOAPAction(SpecSpecificConstants.getTerminateSequenceSOAPAction(rmVersion));
-
- SequencePropertyBean transportToBean = seqPropMgr.retrieve(internalSequenceId,Sandesha2Constants.SequenceProperties.TRANSPORT_TO);
- if (transportToBean!=null) {
- terminateRMMessage.setProperty(MessageContextConstants.TRANSPORT_URL,transportToBean.getValue());
- }
-
- try {
- terminateRMMessage.addSOAPEnvelope();
- } catch (AxisFault e) {
- throw new SandeshaException(e.getMessage());
- }
-
- String key = SandeshaUtil.getUUID();
-
- SenderBean terminateBean = new SenderBean();
- terminateBean.setMessageContextRefKey(key);
-
-
- storageManager.storeMessageContext(key,terminateRMMessage.getMessageContext());
-
-
- //Set a retransmitter lastSentTime so that terminate will be send with
- // some delay.
- //Otherwise this get send before return of the current request (ack).
- //TODO: refine the terminate delay.
- terminateBean.setTimeToSend(System.currentTimeMillis()
- + Sandesha2Constants.TERMINATE_DELAY);
- terminateBean.setMessageID(terminateRMMessage.getMessageId());
-
- //this will be set to true at the sender.
- terminateBean.setSend(true);
-
- terminateRMMessage.getMessageContext().setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING,
- Sandesha2Constants.VALUE_FALSE);
-
- terminateBean.setReSend(false);
-
- SenderBeanMgr retramsmitterMgr = storageManager
- .getRetransmitterBeanMgr();
-
- retramsmitterMgr.insert(terminateBean);
-
- SequencePropertyBean terminateAdded = new SequencePropertyBean();
- terminateAdded.setName(Sandesha2Constants.SequenceProperties.TERMINATE_ADDED);
- terminateAdded.setSequenceID(outSequenceId);
- terminateAdded.setValue("true");
-
- seqPropMgr.insert(terminateAdded);
-
- //This should be dumped to the storage by the sender
- TransportOutDescription transportOut = terminateRMMessage.getMessageContext().getTransportOut();
- terminateRMMessage.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC,transportOut);
- terminateRMMessage.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY,key);
- terminateRMMessage.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE,Sandesha2Constants.VALUE_TRUE);
- terminateRMMessage.getMessageContext().setTransportOut(new Sandesha2TransportOutDesc ());
- addTerminateSeqTransaction.commit();
-
- AxisEngine engine = new AxisEngine (incomingAckRMMsg.getMessageContext().getConfigurationContext());
- try {
- engine.send(terminateRMMessage.getMessageContext());
- } catch (AxisFault e) {
- throw new SandeshaException (e.getMessage());
- }
-
- }
private static long getNoOfMessagesAcked (Iterator ackRangeIterator) {
long noOfMsgs = 0;
@@ -408,5 +304,9 @@
}
return noOfMsgs;
+ }
+
+ public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org