You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/12/27 14:07:19 UTC
svn commit: r359208 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/
msgprocessors/ storage/ storage/inmemory/ util/ workers/
Author: chamikara
Date: Tue Dec 27 05:06:25 2005
New Revision: 359208
URL: http://svn.apache.org/viewcvs?rev=359208&view=rev
Log:
Bug fixes.
Corrrected inactivity timeout logic.
Some changes to improve the preformance.
Corrections in the transaction logic.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Tue Dec 27 05:06:25 2005
@@ -23,6 +23,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -41,7 +42,8 @@
public class AcknowledgementManager {
/**
- * Piggybacks any available acks of the same sequence to the given application message.
+ * Piggybacks any available acks of the same sequence to the given
+ * application message.
*
* @param applicationRMMsgContext
* @throws SandeshaException
@@ -52,6 +54,7 @@
.getMessageContext().getConfigurationContext();
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(configurationContext);
+
SenderBeanMgr retransmitterBeanMgr = storageManager
.getRetransmitterBeanMgr();
SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager
@@ -68,7 +71,8 @@
String sequenceId = sequence.getIdentifier().getIdentifier();
SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr
- .retrieve(sequenceId,
+ .retrieve(
+ sequenceId,
Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
if (internalSequenceBean == null)
throw new SandeshaException("Temp Sequence is not set");
@@ -82,27 +86,32 @@
Iterator it = collection.iterator();
if (it.hasNext()) {
+
SenderBean ackBean = (SenderBean) it.next();
- //deleting the ack entry.
- retransmitterBeanMgr.delete(ackBean.getMessageID());
+ long timeNow = System.currentTimeMillis();
+ if (ackBean.getTimeToSend() > timeNow) { //Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
- //Adding the ack to the application message
- MessageContext ackMsgContext = SandeshaUtil
- .getStoredMessageContext(ackBean.getMessageContextRefKey());
- RMMsgContext ackRMMsgContext = MsgInitializer
- .initializeMessage(ackMsgContext);
- if (ackRMMsgContext.getMessageType() != Sandesha2Constants.MessageTypes.ACK)
- throw new SandeshaException("Invalid ack message entry");
-
- SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgContext
- .getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
- applicationRMMsgContext.setMessagePart(
- Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
- sequenceAcknowledgement);
+ //deleting the ack entry.
+ retransmitterBeanMgr.delete(ackBean.getMessageID());
- applicationRMMsgContext.addSOAPEnvelope();
- }
+ //Adding the ack to the application message
+ MessageContext ackMsgContext = SandeshaUtil
+ .getStoredMessageContext(ackBean
+ .getMessageContextRefKey());
+ RMMsgContext ackRMMsgContext = MsgInitializer
+ .initializeMessage(ackMsgContext);
+ if (ackRMMsgContext.getMessageType() != Sandesha2Constants.MessageTypes.ACK)
+ throw new SandeshaException("Invalid ack message entry");
+
+ SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) ackRMMsgContext
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+ applicationRMMsgContext.setMessagePart(
+ Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+ sequenceAcknowledgement);
+ applicationRMMsgContext.addSOAPEnvelope();
+ }
+ }
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Tue Dec 27 05:06:25 2005
@@ -178,6 +178,8 @@
String OFFERED_SEQUENCE = "OfferedSequence";
String TERMINATE_ADDED = "TerminateAdded";
+
+ String LAST_ACTIVATED_TIME = "LastActivatedTime";
}
public interface SOAPVersion {
@@ -298,11 +300,11 @@
int INVOKER_SLEEP_TIME = 1000;
- int SENDER_SLEEP_TIME = 1000;
+ int SENDER_SLEEP_TIME = 500;
int CLIENT_SLEEP_TIME = 10000;
- int TERMINATE_DELAY = 1000;
+ int TERMINATE_DELAY = 100;
String TEMP_SEQUENCE_ID = "uuid:tempID";
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/SandeshaModule.java Tue Dec 27 05:06:25 2005
@@ -18,8 +18,11 @@
package org.apache.sandesha2;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.modules.Module;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
* The Module class of Sandesha2.
@@ -32,12 +35,21 @@
// initialize the module
public void init(AxisConfiguration axisSystem) throws AxisFault {
-
+ cleanStorage (axisSystem);
}
// shutdown the module
public void shutdown(AxisConfiguration axisSystem) throws AxisFault {
+ }
+
+ private void cleanStorage (AxisConfiguration axisSystem) throws AxisFault {
+
+ ConfigurationContext configurationContext = new ConfigurationContext (axisSystem);
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext);
+
+ storageManager.initStorage();
+
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Tue Dec 27 05:06:25 2005
@@ -85,12 +85,12 @@
*/
public static void terminateAfterInvocation (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
- SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
InvokerBeanMgr storageMapBeanMgr = storageManager.getStorageMapBeanMgr();
//removing storageMap entries
InvokerBean findStorageMapBean = new InvokerBean ();
findStorageMapBean.setSequenceID(sequenceID);
+ findStorageMapBean.setInvoked(true);
Collection collection = storageMapBeanMgr.find(findStorageMapBean);
Iterator iterator = collection.iterator();
while (iterator.hasNext()) {
@@ -98,6 +98,13 @@
storageMapBeanMgr.delete(storageMapBean.getMessageContextRefKey());
}
+ removeReceivingSideProperties(configContext,sequenceID);
+
+ }
+
+ private static void removeReceivingSideProperties (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
SequencePropertyBean allSequenceBean = sequencePropertyBeanMgr.retrieve(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
ArrayList allSequenceList = SandeshaUtil.getArrayListFromString(allSequenceBean.getValue());
allSequenceList.remove(sequenceID);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Tue Dec 27 05:06:25 2005
@@ -218,9 +218,23 @@
acksTo = (String) msgCtx
.getProperty(Sandesha2ClientAPI.AcksTo);
}
+
+ if (msgCtx.isServerSide()) {
+ //we do not set acksTo value to anonymous when the create sequence is send from the server.
+
+ MessageContext requestMessage = operationContext.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if (requestMessage==null) {
+ throw new SandeshaException ("Request message is not present");
+ }
+
+ acksTo = requestMessage.getTo().getAddress();
+
+ } else {
+ if (acksTo == null)
+ acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
+ }
+
- if (acksTo == null)
- acksTo = Sandesha2Constants.WSA.NS_URI_ANONYMOUS;
//If acksTo is not anonymous. Start the listner TODO: verify
if (!Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Tue Dec 27 05:06:25 2005
@@ -36,6 +36,7 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.Nack;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -80,6 +81,9 @@
if (outSequenceId == null || "".equals(outSequenceId))
throw new SandeshaException("OutSequenceId is null");
+ //updating the last activated time of the sequence.
+ SequenceManager.updateLastActivatedTime(outSequenceId,rmMsgCtx.getMessageContext().getConfigurationContext());
+
SequencePropertyBean internalSequenceBean = seqPropMgr.retrieve(
outSequenceId, Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
@@ -154,8 +158,6 @@
addTerminateSequenceMessage(rmMsgCtx, outSequenceId,
internalSequenceId);
}
-
-
}
//stopping the progress of the message further.
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Dec 27 05:06:25 2005
@@ -53,6 +53,7 @@
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.LastMessage;
import org.apache.sandesha2.wsrm.Sequence;
@@ -83,23 +84,28 @@
if (msgCtx == null)
throw new SandeshaException("Message context is null");
- if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
- && rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE)
- .equals("true")) {
+ if (rmMsgCtx
+ .getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
+ && rmMsgCtx.getProperty(
+ Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals(
+ "true")) {
return;
}
- //RM will not rend sync responses. If sync acks are there this will be made true again later.
- if(rmMsgCtx.getMessageContext().getOperationContext()!=null) {
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,Constants.VALUE_FALSE);
+ //RM will not rend sync responses. If sync acks are there this will be
+ // made true again later.
+ if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+ Constants.RESPONSE_WRITTEN, Constants.VALUE_FALSE);
}
-
+
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(rmMsgCtx.getMessageContext()
.getConfigurationContext());
-
- Transaction applicationMsgTransaction = storageManager.getTransaction();
-
+
+ Transaction updataMsgStringTransaction = storageManager
+ .getTransaction();
+
SequencePropertyBeanMgr seqPropMgr = storageManager
.getSequencePropretyBeanMgr();
@@ -112,6 +118,9 @@
if (configCtx == null)
throw new SandeshaException("Configuration Context is null");
+ //updating the last activated time of the sequence.
+ SequenceManager.updateLastActivatedTime(sequenceId,configCtx);
+
SequencePropertyBean msgsBean = seqPropMgr.retrieve(sequenceId,
Sandesha2Constants.SequenceProperties.RECEIVED_MESSAGES);
@@ -127,7 +136,8 @@
// EXACTLY_ONCE.
//msgCtx.pause();
- rmMsgCtx.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));
+ rmMsgCtx.getMessageContext().setPausedTrue(
+ new QName(Sandesha2Constants.IN_HANDLER_NAME));
}
@@ -139,7 +149,11 @@
msgsBean.setValue(messagesStr);
seqPropMgr.update(msgsBean);
- sendAckIfNeeded(rmMsgCtx, messagesStr);
+ updataMsgStringTransaction.commit();
+
+
+
+ Transaction invokeTransaction = storageManager.getTransaction();
// Pause the messages bean if not the right message to invoke.
NextMsgBeanMgr mgr = storageManager.getNextMsgBeanMgr();
@@ -152,82 +166,71 @@
long nextMsgno = bean.getNextMsgNoToProcess();
- if (msgCtx.isServerSide()) {
- boolean inOrderInvocation = PropertyManager.getInstance().isInOrderInvocation();
- if (inOrderInvocation) {
- //pause the message
- //msgCtx.pause();
- rmMsgCtx.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));
- SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr
- .retrieve(
- Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
- Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
-
- if (incomingSequenceListBean == null) {
- ArrayList incomingSequenceList = new ArrayList();
- incomingSequenceListBean = new SequencePropertyBean();
- incomingSequenceListBean
- .setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
- incomingSequenceListBean
- .setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
- incomingSequenceListBean.setValue(incomingSequenceList.toString());
+ boolean inOrderInvocation = PropertyManager.getInstance()
+ .isInOrderInvocation();
+ if (inOrderInvocation) {
+ //pause the message
+ //msgCtx.pause();
+ rmMsgCtx.getMessageContext().setPausedTrue(
+ new QName(Sandesha2Constants.IN_HANDLER_NAME));
+ SequencePropertyBean incomingSequenceListBean = (SequencePropertyBean) seqPropMgr
+ .retrieve(
+ Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+ Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+
+ if (incomingSequenceListBean == null) {
+ ArrayList incomingSequenceList = new ArrayList();
+ incomingSequenceListBean = new SequencePropertyBean();
+ incomingSequenceListBean
+ .setSequenceID(Sandesha2Constants.SequenceProperties.ALL_SEQUENCES);
+ incomingSequenceListBean
+ .setName(Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+ incomingSequenceListBean.setValue(incomingSequenceList
+ .toString());
- seqPropMgr.insert(incomingSequenceListBean);
- }
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
- ArrayList incomingSequenceList = SandeshaUtil.getArrayListFromString(incomingSequenceListBean
- .getValue());
+ ArrayList incomingSequenceList = SandeshaUtil
+ .getArrayListFromString(incomingSequenceListBean.getValue());
- //Adding current sequence to the incoming sequence List.
- if (!incomingSequenceList.contains(sequenceId)) {
- incomingSequenceList.add(sequenceId);
-
- //saving the property.
- incomingSequenceListBean.setValue(incomingSequenceList.toString());
- seqPropMgr.insert(incomingSequenceListBean);
- }
+ //Adding current sequence to the incoming sequence List.
+ if (!incomingSequenceList.contains(sequenceId)) {
+ incomingSequenceList.add(sequenceId);
+
+ //saving the property.
+ incomingSequenceListBean.setValue(incomingSequenceList
+ .toString());
+ seqPropMgr.insert(incomingSequenceListBean);
+ }
- //saving the message.
- try {
- String key = SandeshaUtil.storeMessageContext(rmMsgCtx
- .getMessageContext());
- storageMapMgr.insert(new InvokerBean(key, msgNo,
- sequenceId));
-
- //This will avoid performing application processing more
- // than
- // once.
- rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
- "true");
+ //saving the message.
+ try {
+ String key = SandeshaUtil.storeMessageContext(rmMsgCtx
+ .getMessageContext());
+ storageMapMgr.insert(new InvokerBean(key, msgNo, sequenceId));
+
+ //This will avoid performing application processing more
+ // than
+ // once.
+ rmMsgCtx.setProperty(
+ Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
- } catch (Exception ex) {
- throw new SandeshaException(ex.getMessage());
- }
+ } catch (Exception ex) {
+ throw new SandeshaException(ex.getMessage());
+ }
- //Starting the invoker if stopped.
- SandeshaUtil.startInvokerIfStopped(msgCtx.getConfigurationContext());
+ //Starting the invoker if stopped.
+ SandeshaUtil
+ .startInvokerIfStopped(msgCtx.getConfigurationContext());
- }
}
-// try {
-// MessageContext requestMessage = rmMsgCtx.getMessageContext()
-// .getOperationContext().getMessageContext(
-// WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-// String requestMessageId = requestMessage.getMessageID();
-// SequencePropertyBean checkResponseBean = seqPropMgr.retrieve(
-// requestMessageId,
-// Sandesha2Constants.SequenceProperties.CHECK_RESPONSE);
-// if (checkResponseBean != null) {
-// checkResponseBean.setValue(msgCtx);
-// seqPropMgr.update(checkResponseBean);
-// }
-//
-// } catch (AxisFault e) {
-// throw new SandeshaException(e.getMessage());
-// }
-
- applicationMsgTransaction.commit();
+ invokeTransaction.commit();
+
+ //Sending acknowledgements
+ sendAckIfNeeded(rmMsgCtx, messagesStr);
+
}
//TODO convert following from INT to LONG
@@ -272,7 +275,7 @@
SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceId,
Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
- EndpointReference acksTo = new EndpointReference (acksToBean.getValue());
+ EndpointReference acksTo = new EndpointReference(acksToBean.getValue());
String acksToStr = acksTo.getAddress();
if (acksToStr == null || messagesStr == null)
@@ -311,9 +314,10 @@
MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(
rmMsgCtx, ackOperation);
-
- ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,"true");
-
+
+ ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE,
+ "true");
+
RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
@@ -351,10 +355,11 @@
}
rmMsgCtx.getMessageContext().getOperationContext().setProperty(
- org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+ org.apache.axis2.Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_TRUE);
- rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN,
- "true");
+ rmMsgCtx.getMessageContext().setProperty(
+ Sandesha2Constants.ACK_WRITTEN, "true");
try {
engine.send(ackRMMsgCtx.getMessageContext());
} catch (AxisFault e1) {
@@ -362,6 +367,8 @@
}
} else {
+ Transaction asyncAckTransaction = storageManager.getTransaction();
+
SenderBeanMgr retransmitterBeanMgr = storageManager
.getRetransmitterBeanMgr();
@@ -380,15 +387,15 @@
RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
.getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
- long ackInterval = PropertyManager.getInstance().getAcknowledgementInterval();
+ long ackInterval = PropertyManager.getInstance()
+ .getAcknowledgementInterval();
if (policyBean != null) {
ackInterval = policyBean.getAcknowledgementInaterval();
}
-
+
//Ack will be sent as stand alone, only after the retransmitter
// interval.
long timeToSend = System.currentTimeMillis() + ackInterval;
- ackBean.setTimeToSend(timeToSend);
//removing old acks.
SenderBean findBean = new SenderBean();
@@ -398,14 +405,19 @@
findBean.setReSend(false);
Collection coll = retransmitterBeanMgr.find(findBean);
Iterator it = coll.iterator();
- while (it.hasNext()) {
- SenderBean retransmitterBean = (SenderBean) it
- .next();
- retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
+
+ if (it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ timeToSend = oldAckBean.getTimeToSend(); //If there is an old ack. This ack will be sent in the old timeToSend.
+ retransmitterBeanMgr.delete(oldAckBean.getMessageID());
}
+
+ ackBean.setTimeToSend(timeToSend);
//inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
+
+ asyncAckTransaction.commit();
SandeshaUtil.startSenderIfStopped(configCtx);
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Tue Dec 27 05:06:25 2005
@@ -33,6 +33,7 @@
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.Accept;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.CreateSequenceResponse;
@@ -215,6 +216,8 @@
retransmitterMgr.update(tempBean);
}
+ SequenceManager.updateLastActivatedTime(newOutSequenceId,configCtx);
+
updateAppMessagesTransaction.commit();
createSeqResponseRMMsgCtx.getMessageContext().getOperationContext()
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Tue Dec 27 05:06:25 2005
@@ -28,6 +28,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.TerminateSequence;
@@ -70,6 +71,8 @@
TerminateManager.terminateReceivingSide(context,sequenceId);
terminateTransaction.commit();
+
+ SequenceManager.updateLastActivatedTime(sequenceId,context);
//terminateSeqMsg.pause();
terminateSeqRMMSg.getMessageContext().setPausedTrue(new QName (Sandesha2Constants.IN_HANDLER_NAME));
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/StorageManager.java Tue Dec 27 05:06:25 2005
@@ -47,6 +47,8 @@
if (context != null)
this.context = context;
}
+
+ public abstract void initStorage ();
public abstract Transaction getTransaction();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java Tue Dec 27 05:06:25 2005
@@ -79,6 +79,9 @@
if (bean.getSequenceID() != null
&& !bean.getSequenceID().equals(temp.getSequenceID()))
select = false;
+
+ if (bean.isInvoked()!=temp.isInvoked())
+ select = false;
if (select)
beans.add(temp);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Tue Dec 27 05:06:25 2005
@@ -16,17 +16,13 @@
*/
package org.apache.sandesha2.storage.inmemory;
-import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import org.apache.axis2.context.AbstractContext;
import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.storage.RetransmitterBeanMgrTest;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Dec 27 05:06:25 2005
@@ -74,4 +74,8 @@
return instance;
}
+
+ public void initStorage () {
+
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Tue Dec 27 05:06:25 2005
@@ -82,7 +82,13 @@
baseInterval);
}
- retransmitterBean.setTimeToSend(lastSentTime + newInterval);
+ long newTimeToSend = 0;
+ //newTimeToSend = lastSentTime + newInterval;
+
+ long timeNow = System.currentTimeMillis();
+ newTimeToSend = timeNow + newInterval;
+
+ retransmitterBean.setTimeToSend(newTimeToSend);
return retransmitterBean;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Tue Dec 27 05:06:25 2005
@@ -9,12 +9,15 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.AbstractContext;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.NextMsgBean;
@@ -97,6 +100,8 @@
// message to invoke
//this will apply for only in-order invocations.
+ updateLastActivatedTime(sequenceId,createSequenceMsg.getMessageContext().getConfigurationContext());
+
return sequenceId;
}
@@ -140,4 +145,71 @@
seqPropMgr.insert(acksToBean);
}
+
+ public static void updateLastActivatedTime (String sequenceID, ConfigurationContext configContext) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ Transaction lastActivatedTransaction = storageManager.getTransaction();
+ SequencePropertyBeanMgr sequencePropertyBeanMgr = storageManager.getSequencePropretyBeanMgr();
+ SequencePropertyBean lastActivatedBean = sequencePropertyBeanMgr.retrieve(sequenceID, Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+
+ boolean added = false;
+
+ if (lastActivatedBean==null) {
+ added = true;
+ lastActivatedBean = new SequencePropertyBean ();
+ lastActivatedBean.setSequenceID(sequenceID);
+ lastActivatedBean.setName(Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ }
+
+ long currentTime = System.currentTimeMillis();
+ lastActivatedBean.setValue(Long.toString(currentTime));
+
+ if (added)
+ sequencePropertyBeanMgr.insert(lastActivatedBean);
+ else
+ sequencePropertyBeanMgr.update(lastActivatedBean);
+
+ lastActivatedTransaction.commit();
+ }
+
+ public static long getLastActivatedTime (String sequenceID, ConfigurationContext configContext) throws SandeshaException {
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
+ SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+
+ long lastActivatedTime = -1;
+
+ if (lastActivatedBean!=null) {
+ lastActivatedTime = Long.parseLong(lastActivatedBean.getValue());
+ }
+
+ return lastActivatedTime;
+ }
+
+ public static boolean hasSequenceTimedOut (String sequenceID, RMMsgContext rmMsgCtx) throws SandeshaException {
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getMessageContext().getConfigurationContext());
+ SequencePropertyBeanMgr seqPropBeanMgr = storageManager.getSequencePropretyBeanMgr();
+
+ RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
+ .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
+ if (policyBean == null) {
+ //loading default policies.
+ policyBean = PropertyManager.getInstance().getRMPolicyBean();
+ }
+
+ boolean sequenceTimedOut = false;
+
+ SequencePropertyBean lastActivatedBean = seqPropBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.LAST_ACTIVATED_TIME);
+ if (lastActivatedBean!=null) {
+ long lastActivatedTime = Long.parseLong(lastActivatedBean.getValue());
+ long timeNow = System.currentTimeMillis();
+ if (lastActivatedTime+policyBean.getInactiveTimeoutInterval()<timeNow)
+ sequenceTimedOut = true;
+ }
+
+ return sequenceTimedOut;
+ }
+
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Tue Dec 27 05:06:25 2005
@@ -182,6 +182,7 @@
Sequence sequence = (Sequence) rmMsg
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if (sequence.getLastMessage() != null) {
+
TerminateManager.terminateAfterInvocation(
context, sequenceId);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=359208&r1=359207&r2=359208&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Tue Dec 27 05:06:25 2005
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Iterator;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -25,9 +26,9 @@
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.sandesha2.AcknowledgementManager;
+import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
@@ -37,11 +38,14 @@
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.TerminateSequence;
/**
- * This is responsible for sending and re-sending messages of Sandesha2. This represent a thread that keep running all
- * the time. This keep looking at the Sender table to find out any entries that should be sent.
+ * This is responsible for sending and re-sending messages of Sandesha2. This
+ * represent a thread that keep running all the time. This keep looking at the
+ * Sender table to find out any entries that should be sent.
*
* @author Chamikara Jayalath <ch...@gmail.com>
*/
@@ -63,29 +67,32 @@
public void run() {
StorageManager storageManager = null;
-
+
try {
- storageManager = SandeshaUtil
- .getSandeshaStorageManager(context);
+ storageManager = SandeshaUtil.getSandeshaStorageManager(context);
} catch (SandeshaException e2) {
// TODO Auto-generated catch block
System.out.println("ERROR: Could not start sender");
e2.printStackTrace();
return;
}
-
+
while (senderStarted) {
try {
if (context == null)
throw new SandeshaException(
"Can't continue the Sender. Context is null");
+ Transaction pickMessagesToSendTransaction = storageManager.getTransaction(); //starting
+ // a
+ // new
+ // transaction
- Transaction sendTransaction = storageManager.getTransaction(); //starting a new transaction
-
- SenderBeanMgr mgr = storageManager
- .getRetransmitterBeanMgr();
+ SenderBeanMgr mgr = storageManager.getRetransmitterBeanMgr();
Collection coll = mgr.findMsgsToSend();
+
+ pickMessagesToSendTransaction.commit();
+
Iterator iter = coll.iterator();
while (iter.hasNext()) {
@@ -96,9 +103,10 @@
.getStoredMessageContext(key);
try {
-
- if (msgCtx==null) {
- System.out.println("ERROR: Sender has an Unavailable Message entry");
+
+ if (msgCtx == null) {
+ System.out
+ .println("ERROR: Sender has an Unavailable Message entry");
break;
}
RMMsgContext rmMsgCtx = MsgInitializer
@@ -121,57 +129,100 @@
+ "' message.");
}
}
+
+ Transaction preSendTransaction = storageManager.getTransaction();
- if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+ int messageType = rmMsgCtx.getMessageType();
+
+ if (messageType == Sandesha2Constants.MessageTypes.APPLICATION) {
+
+ Sequence sequence = (Sequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ String sequenceID = sequence.getIdentifier().getIdentifier();
+ //checking weather the sequence has been timed out.
+ boolean sequenceTimedOut = SequenceManager.hasSequenceTimedOut (sequenceID, rmMsgCtx);;
+ if (sequenceTimedOut) {
+ //sequence has been timed out.
+ //do time out processing.
+
+ TerminateManager.terminateSendingSide(context,sequenceID);
+ throw new SandeshaException ("Sequence timed out");
+ }
+
//piggybacking if an ack if available for the same
// sequence.
AcknowledgementManager
.piggybackAckIfPresent(rmMsgCtx);
+
}
+
+ preSendTransaction.commit();
try {
- AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
- engine.send(msgCtx);
-// if (msgCtx.isPaused())
-// engine.resumeSend(msgCtx);
-// else
-// engine.send(msgCtx);
+ AxisEngine engine = new AxisEngine(msgCtx
+ .getConfigurationContext());
+ engine.send(msgCtx);
+ // if (msgCtx.isPaused())
+ // engine.resumeSend(msgCtx);
+ // else
+ // engine.send(msgCtx);
+
} catch (Exception e) {
//Exception is sending. retry later
System.out
.println("Exception thrown in sending...");
e.printStackTrace();
+ //e.printStackTrace();
+
}
+
+ Transaction postSendTransaction = storageManager.getTransaction();
MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
+
+ if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
+ Sequence sequence = (Sequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ long messageNo = sequence.getMessageNumber()
+ .getMessageNumber();
+ }
+
retransmitterAdjuster.adjustRetransmittion(bean);
-// mgr.update(bean);
-
- if (bean.isReSend())
- mgr.update(bean);
- else
- mgr.delete(bean.getMessageID());
-
- sendTransaction.commit(); //commiting the current transaction
+ //update or delete only if the object is still present.
+ SenderBean bean1 = mgr.retrieve(bean.getMessageID());
+ if (bean1 != null) {
+ if (bean.isReSend())
+ mgr.update(bean);
+ else
+ mgr.delete(bean.getMessageID());
+ }
+
+ postSendTransaction.commit(); //commiting the current
+ // transaction
- Transaction processResponseTransaction = storageManager.getTransaction();
+ Transaction processResponseTransaction =
+ storageManager.getTransaction();
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
-
+
processResponseTransaction.commit();
-
- Transaction terminateCleaningTransaction = storageManager.getTransaction();
- if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+
+ Transaction terminateCleaningTransaction = storageManager
+ .getTransaction();
+ if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
//terminate sending side.
- TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
- String sequenceID = terminateSequence.getIdentifier().getIdentifier();
- ConfigurationContext configContext = msgCtx.getConfigurationContext();
-
- TerminateManager.terminateSendingSide(configContext,sequenceID);
+ TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
+ String sequenceID = terminateSequence
+ .getIdentifier().getIdentifier();
+ ConfigurationContext configContext = msgCtx
+ .getConfigurationContext();
+
+ TerminateManager.terminateSendingSide(
+ configContext, sequenceID);
}
-
+
terminateCleaningTransaction.commit();
} catch (AxisFault e1) {
@@ -179,30 +230,15 @@
} catch (Exception e3) {
e3.printStackTrace();
}
-
- //changing the values of the sent bean.
- //bean.setLastSentTime(System.currentTimeMillis());
- //bean.setSentCount(bean.getSentCount() + 1);
-
- //update if resend=true otherwise delete. (reSend=false
- // means
- // send only once).
-// if (bean.isReSend())
-// mgr.update(bean);
-// else
-// mgr.delete(bean.getMessageID());
-
}
-
-
-
+
} catch (SandeshaException e) {
e.printStackTrace();
return;
}
try {
- Thread.sleep(2000);
+ Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
} catch (InterruptedException e1) {
//e1.printStackTrace();
System.out.println("Sender was interupted...");
@@ -248,58 +284,61 @@
}
- private void checkForSyncResponses(MessageContext msgCtx) {
+ private void checkForSyncResponses(MessageContext msgCtx) {
try {
- boolean responsePresent = (msgCtx
- .getProperty(MessageContext.TRANSPORT_IN) != null);
+ boolean responsePresent = (msgCtx
+ .getProperty(MessageContext.TRANSPORT_IN) != null);
- if (responsePresent) {
- //create the response
- MessageContext response = new MessageContext(msgCtx
- .getConfigurationContext(), msgCtx.getSessionContext(), msgCtx
- .getTransportIn(), msgCtx.getTransportOut());
- response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
- .getProperty(MessageContext.TRANSPORT_IN));
-
- response.setServerSide(false);
-
- //If request is REST we assume the response is REST, so set the
- // variable
- response.setDoingREST(msgCtx.isDoingREST());
- response
- .setServiceGroupContextId(msgCtx.getServiceGroupContextId());
- response.setServiceGroupContext(msgCtx.getServiceGroupContext());
- response.setServiceContext(msgCtx.getServiceContext());
- response.setAxisService(msgCtx.getAxisService());
- response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
-
- //setting the in-flow.
- //ArrayList inPhaseHandlers =
- // response.getAxisOperation().getRemainingPhasesInFlow();
- /*
- * if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
- * ArrayList phases =
- * msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
- * response.getAxisOperation().setRemainingPhasesInFlow(phases); }
- */
-
- //Changed following from TransportUtils to SandeshaUtil since op.
- // context is anavailable.
- SOAPEnvelope resenvelope = null;
- resenvelope = SandeshaUtil.createSOAPMessage(response, msgCtx
- .getEnvelope().getNamespace().getName());
-
-
- if (resenvelope != null) {
- AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext());
- response.setEnvelope(resenvelope);
- engine.receive(response);
+ if (responsePresent) {
+ //create the response
+ MessageContext response = new MessageContext(msgCtx
+ .getConfigurationContext(), msgCtx.getSessionContext(),
+ msgCtx.getTransportIn(), msgCtx.getTransportOut());
+ response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
+ .getProperty(MessageContext.TRANSPORT_IN));
+
+ response.setServerSide(false);
+
+ //If request is REST we assume the response is REST, so set the
+ // variable
+ response.setDoingREST(msgCtx.isDoingREST());
+ response.setServiceGroupContextId(msgCtx
+ .getServiceGroupContextId());
+ response
+ .setServiceGroupContext(msgCtx.getServiceGroupContext());
+ response.setServiceContext(msgCtx.getServiceContext());
+ response.setAxisService(msgCtx.getAxisService());
+ response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
+
+ //setting the in-flow.
+ //ArrayList inPhaseHandlers =
+ // response.getAxisOperation().getRemainingPhasesInFlow();
+ /*
+ * if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
+ * ArrayList phases =
+ * msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
+ * response.getAxisOperation().setRemainingPhasesInFlow(phases); }
+ */
+
+ //Changed following from TransportUtils to SandeshaUtil since
+ // op.
+ // context is anavailable.
+ SOAPEnvelope resenvelope = null;
+ resenvelope = SandeshaUtil.createSOAPMessage(response, msgCtx
+ .getEnvelope().getNamespace().getName());
+
+ if (resenvelope != null) {
+ AxisEngine engine = new AxisEngine(msgCtx
+ .getConfigurationContext());
+ response.setEnvelope(resenvelope);
+ engine.receive(response);
+ }
}
- }
-
- }catch (Exception e) {
- System.out.println("Exception was throws in processing the sync response...");
+
+ } catch (Exception e) {
+ System.out
+ .println("Exception was throws in processing the sync response...");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org