You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/11/14 10:37:23 UTC
svn commit: r344090 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/
msgprocessors/ policy/ storage/beans/ storage/inmemory/ util/ workers/
Author: chamikara
Date: Mon Nov 14 01:36:19 2005
New Revision: 344090
URL: http://svn.apache.org/viewcvs?rev=344090&view=rev
Log:
Added exponetian backoff concept - Retransmission interval get twice after every retransmission.
Added a MAXUMIM_RETRANSMISSION_COUNT. Retransmission will happen only this number of times.
A new class MessageRetransmissionAdjuster to set the retransmitterBean after retransmission.
Added:
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Nov 14 01:36:19 2005
@@ -116,10 +116,12 @@
}
public interface WSP {
- long RETRANSMISSION_INTERVAL = 500000;
+ long RETRANSMISSION_INTERVAL = 1000;
long ACKNOWLEDGEMENT_INTERVAL = 3000;
- boolean EXPONENTION_BACKOFF = false;
+ boolean EXPONENTION_BACKOFF = true;
long INACTIVITY_TIMEOUT_INTERVAL = 5000000;
+
+ String RM_POLICY_BEAN = "RMPolicyBean";
}
public interface MessageTypes {
@@ -282,5 +284,7 @@
String SANDESHA_DEBUG_MODE = "SandeshaDebugMode";
String STORAGE_MANAGER_IMPL = "org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
+
+ int MAXIMUM_RETRANSMISSION_ATTEMPTS = 5;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon Nov 14 01:36:19 2005
@@ -51,14 +51,24 @@
protected Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName());
public void invoke(MessageContext msgContext) throws AxisFault {
-
+
//try {
+
+ //Quit the message with minimum processing if not intended for RM.
+ boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage (msgContext);
+ if (!isRMGlobalMessage) {
+ return;
+ }
+
RMMsgContext rmMessageContext = MsgInitializer
.initializeMessage(msgContext);
+
ConfigurationContext context = rmMessageContext.getMessageContext()
.getSystemContext();
+ //context.setProperty (Constants.SANDESHA_DEBUG_MODE,"on");
+
Object debug = context.getProperty(Constants.SANDESHA_DEBUG_MODE);
if (debug != null && "on".equals(debug)) {
System.out.println("DEBUG: SandeshaGlobalInHandler got a '"
@@ -157,8 +167,8 @@
receivedMsgsBean.setValue(receivedMsgStr);
seqPropMgr.update(receivedMsgsBean);
- //ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor ();
- //ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr);
+ ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor ();
+ ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr);
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Nov 14 01:36:19 2005
@@ -41,6 +41,7 @@
import org.apache.sandesha2.SandeshaDynamicProperties;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
@@ -50,6 +51,7 @@
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.RMPolicyManager;
import org.apache.sandesha2.util.SOAPAbstractFactory;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SequenceManager;
@@ -62,6 +64,7 @@
import org.apache.sandesha2.wsrm.SequenceOffer;
import org.apache.wsdl.WSDLConstants;
+
/**
*
* @author chamikara
@@ -134,6 +137,12 @@
//Strating the sender.
SandeshaUtil.startSenderIfStopped(context);
+
+ //Adding the policy bean
+ RMPolicyBean policyBean = RMPolicyManager.getPolicyBean(rmMsgCtx);
+ rmMsgCtx.setProperty(Constants.WSP.RM_POLICY_BEAN,policyBean);
+
+
StorageManager storageManager = SandeshaUtil
.getSandeshaStorageManager(context);
@@ -471,7 +480,7 @@
.getMessageContext());
RetransmitterBean createSeqEntry = new RetransmitterBean();
createSeqEntry.setKey(key);
- createSeqEntry.setLastSentTime(0);
+ createSeqEntry.setTimeToSend(System.currentTimeMillis());
createSeqEntry.setMessageId(createSeqRMMessage.getMessageId());
createSeqEntry.setSend(true);
retransmitterMgr.insert(createSeqEntry);
@@ -661,7 +670,7 @@
String key = SandeshaUtil
.storeMessageContext(rmMsg.getMessageContext());
appMsgEntry.setKey(key);
- appMsgEntry.setLastSentTime(0);
+ appMsgEntry.setTimeToSend(System.currentTimeMillis());
appMsgEntry.setMessageId(rmMsg.getMessageId());
appMsgEntry.setMessageNumber(messageNumber);
if (outSequenceBean == null || outSequenceBean.getValue() == null) {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Nov 14 01:36:19 2005
@@ -41,6 +41,10 @@
public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+// boolean b = true;
+// if (b)
+// return;
+
SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
.getMessagePart(Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
if (sequenceAck == null)
@@ -223,7 +227,7 @@
// some delay.
//Otherwise this get send before return of the current request (ack).
//TODO verify that the time given is correct
- terminateBean.setLastSentTime(System.currentTimeMillis()
+ terminateBean.setTimeToSend(System.currentTimeMillis()
+ Constants.TERMINATE_DELAY);
terminateBean.setMessageId(terminateRMMessage.getMessageId());
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java Mon Nov 14 01:36:19 2005
@@ -14,21 +14,26 @@
* limitations under the License.
*/
-package org.apache.sandesha2.policy;
/**
* @author Sanka Samaranayake (sanka@apache.org)
*/
+package org.apache.sandesha2.policy;
+
+import org.apache.sandesha2.Constants;
public class RMPolicyBean {
- private long inactiveTimeoutInterval = -1l;
- private long acknowledgementInterval = -1l;
- private long retransmissionInterval = -1l;
- private boolean exponentialBackoff = false;
+ private long inactiveTimeoutInterval = Constants.WSP.INACTIVITY_TIMEOUT_INTERVAL;
+ private long acknowledgementInterval = Constants.WSP.ACKNOWLEDGEMENT_INTERVAL;
+ private long retransmissionInterval = Constants.WSP.RETRANSMISSION_INTERVAL;
+ private boolean exponentialBackoff = Constants.WSP.EXPONENTION_BACKOFF;
+ public RMPolicyBean () {
+ loadValuesFromPropertyFile ();
+ }
- public RMPolicyBean() {
-
+ private void loadValuesFromPropertyFile () {
+ //TODO load policy values from the file.
}
public long getInactiveTimeoutInterval() {
@@ -43,7 +48,7 @@
return retransmissionInterval;
}
- public boolean getExponentialBackoff() {
+ public boolean isExponentialBackoff() {
return exponentialBackoff;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java Mon Nov 14 01:36:19 2005
@@ -23,7 +23,7 @@
private String key;
- private long LastSentTime;
+// private long LastSentTime;
private boolean Send;
@@ -35,15 +35,18 @@
private boolean reSend = true;
+ private long timeToSend = 0;
+
public RetransmitterBean() {
}
- public RetransmitterBean(String messageId, String key, long lastSentTime,
- boolean send, String tempSequenceId, long messageNumber) {
+ public RetransmitterBean(String messageId, String key,
+ boolean send,long timeToSend, String tempSequenceId, long messageNumber) {
this.messageId = messageId;
this.key = key;
- this.LastSentTime = lastSentTime;
+ //this.LastSentTime = lastSentTime;
+ this.timeToSend = timeToSend;
this.Send = send;
this.tempSequenceId = tempSequenceId;
this.messageNumber = messageNumber;
@@ -57,13 +60,13 @@
this.key = key;
}
- public long getLastSentTime() {
- return LastSentTime;
- }
-
- public void setLastSentTime(long lastSentTime) {
- LastSentTime = lastSentTime;
- }
+// public long getLastSentTime() {
+// return LastSentTime;
+// }
+//
+// public void setLastSentTime(long lastSentTime) {
+// LastSentTime = lastSentTime;
+// }
public String getMessageId() {
return messageId;
@@ -112,5 +115,12 @@
public void setReSend(boolean reSend) {
this.reSend = reSend;
}
-
+
+ public long getTimeToSend() {
+ return timeToSend;
+ }
+
+ public void setTimeToSend(long timeToSend) {
+ this.timeToSend = timeToSend;
+ }
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java Mon Nov 14 01:36:19 2005
@@ -71,6 +71,7 @@
RetransmitterBean temp;
while (iterator.hasNext()) {
+
temp = (RetransmitterBean) iterator.next();
boolean add = true;
@@ -78,8 +79,8 @@
if (bean.getKey() != null && !bean.getKey().equals(temp.getKey()))
add = false;
- if (bean.getLastSentTime() > 0
- && bean.getLastSentTime() != temp.getLastSentTime())
+ if (bean.getTimeToSend() > 0
+ && bean.getTimeToSend() != temp.getTimeToSend())
add = false;
if (bean.getMessageId() != null
@@ -110,11 +111,14 @@
while (iterator.hasNext()) {
temp = (RetransmitterBean) iterator.next();
if (temp.isSend()) {
- long lastSentTime = temp.getLastSentTime();
+
+ long timeToSend = temp.getTimeToSend();
+
int count = temp.getSentCount();
+
long timeNow = System.currentTimeMillis();
if (count == 0
- || (timeNow > (lastSentTime + Constants.WSP.RETRANSMISSION_INTERVAL))) {
+ || (timeNow >= timeToSend)) {
beans.add(temp);
}
}
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=344090&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Mon Nov 14 01:36:19 2005
@@ -0,0 +1,93 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.derby.iapi.sql.dictionary.ConsInfo;
+import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.policy.RMPolicyBean;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
+
+
+/**
+ * @author chamikara
+ */
+
+public class MessageRetransmissionAdjuster {
+
+ public RetransmitterBean adjustRetransmittion (RetransmitterBean retransmitterBean) {
+ String storedKey = (String) retransmitterBean.getKey();
+
+ if (storedKey==null)
+ return retransmitterBean;
+
+ MessageContext messageContext = SandeshaUtil.getStoredMessageContext(storedKey);
+
+ if (messageContext.getSystemContext()==null)
+ return retransmitterBean;
+
+ RMPolicyBean policyBean = (RMPolicyBean) messageContext.getProperty(Constants.WSP.RM_POLICY_BEAN);
+ if (policyBean==null){
+ return retransmitterBean;
+ }
+
+ long oldRetransmissionTime = retransmitterBean.getTimeToSend();
+
+ retransmitterBean.setSentCount(retransmitterBean.getSentCount()+1);
+ adjustNextRetransmissionTime (retransmitterBean,policyBean);
+
+ if (retransmitterBean.getSentCount()>=Constants.MAXIMUM_RETRANSMISSION_ATTEMPTS)
+ stopRetransmission (retransmitterBean);
+
+ return retransmitterBean;
+ }
+
+ private RetransmitterBean adjustNextRetransmissionTime (RetransmitterBean retransmitterBean,RMPolicyBean policyBean) {
+
+ long lastSentTime = retransmitterBean.getTimeToSend();
+
+ int count = retransmitterBean.getSentCount();
+
+ long baseInterval = policyBean.getRetransmissionInterval();
+
+ long timeToSendNext;
+ if (policyBean.isExponentialBackoff()) {
+ long newInterval = generateNextExponentialBackedoffDifference (count,baseInterval);
+ retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+ }
+
+ return retransmitterBean;
+ }
+
+ private void stopRetransmission (RetransmitterBean bean) {
+ bean.setReSend(false);
+ }
+
+
+ //TODO: Have to change this to be plugable
+ private long generateNextExponentialBackedoffDifference(int count,long initialInterval) {
+ long interval = initialInterval;
+ for (int i=1;i<=count;i++){
+ interval = interval*2;
+ }
+
+ return interval;
+ }
+
+
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Mon Nov 14 01:36:19 2005
@@ -38,6 +38,7 @@
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
@@ -56,6 +57,14 @@
public class RMMsgCreator {
+
+ private static void setUpMessage (MessageContext rmMsgCtx) {
+ //Seting RMPolicyBean
+ RMPolicyBean policyBean = new RMPolicyBean ();
+ rmMsgCtx.setProperty(Constants.WSP.RM_POLICY_BEAN,policyBean);
+
+ }
+
public static RMMsgContext createCreateSeqMsg(
RMMsgContext applicationRMMsg, String tempSequenceId, String acksTo)
throws SandeshaException {
@@ -94,6 +103,8 @@
createSeqmsgContext.setServiceContextID(applicationMsgContext
.getServiceContextID());
+ setUpMessage(createSeqmsgContext);
+
String createSeqMsgId = SandeshaUtil.getUUID();
try {
AxisOperation appMsgOperationDesc = applicationMsgContext
@@ -234,6 +245,8 @@
if (terminateMessage == null)
throw new SandeshaException("MessageContext is null");
+ setUpMessage(terminateMessage);
+
SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion ( referenceMessage.getEnvelope()));
MessageInformationHeaders newMessageInfoHeaders = new MessageInformationHeaders();
@@ -338,11 +351,14 @@
outMessage.setWSAAction(Constants.WSRM.Actions.ACTION_CREATE_SEQUENCE_RESPONSE);
outMessage.setSoapAction(Constants.WSRM.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE);
+
String newMessageId = SandeshaUtil.getUUID();
outMessage.setMessageID(newMessageId);
outMessage.setEnvelope(envelope);
+ setUpMessage(outMessage);
+
RMMsgContext createSeqResponse = null;
try {
createSeqResponse = MsgInitializer.initializeMessage(outMessage);
@@ -419,6 +435,9 @@
.getAxisOperation());
ackMsgCtx.setOperationContext(ackOpCtx);
+
+ setUpMessage(ackMsgCtx);
+
ackOpCtx.addMessageContext(ackMsgCtx);
Sequence reqSequence = (Sequence) applicationRMMsgCtx
Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java?rev=344090&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java Mon Nov 14 01:36:19 2005
@@ -0,0 +1,30 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.msgreceivers.RMMessageReceiver;
+import org.apache.sandesha2.policy.RMPolicyBean;
+
+public class RMPolicyManager {
+
+ public static RMPolicyBean getPolicyBean (RMMsgContext msgContext) {
+ RMPolicyBean policyBean = new RMPolicyBean ();
+ return policyBean;
+ }
+}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Nov 14 01:36:19 2005
@@ -22,6 +22,8 @@
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
+
+import javax.xml.namespace.QName;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamReader;
import org.apache.axis2.AxisFault;
@@ -31,6 +33,7 @@
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.om.OMElement;
import org.apache.axis2.om.impl.llom.builder.StAXBuilder;
import org.apache.axis2.om.impl.llom.builder.StAXOMBuilder;
import org.apache.axis2.soap.SOAP11Constants;
@@ -496,5 +499,24 @@
return Constants.SOAPVersion.v1_2;
else
throw new SandeshaException ("Unknown SOAP version");
+ }
+
+ public static boolean isRMGlobalMessage (MessageContext msgCtx) {
+ boolean rmGlobalMsg = false;
+
+ String action = msgCtx.getWSAAction();
+ SOAPEnvelope env = msgCtx.getEnvelope();
+ OMElement sequenceElem = env.getFirstChildWithName(new QName (Constants.WSRM.NS_URI_RM,Constants.WSRM.SEQUENCE));
+
+ if (sequenceElem!=null)
+ rmGlobalMsg = true;
+
+ if (Constants.WSRM.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT.equals(action))
+ rmGlobalMsg = true;
+
+ if (Constants.WSRM.Actions.ACTION_TERMINATE_SEQUENCE.equals(action))
+ rmGlobalMsg = true;
+
+ return rmGlobalMsg;
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon Nov 14 01:36:19 2005
@@ -29,6 +29,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
import org.apache.sandesha2.storage.beans.RetransmitterBean;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
@@ -61,7 +62,9 @@
.getRetransmitterBeanMgr();
Collection coll = mgr.findMsgsToSend();
Iterator iter = coll.iterator();
+
while (iter.hasNext()) {
+
RetransmitterBean bean = (RetransmitterBean) iter.next();
String key = (String) bean.getKey();
MessageContext msgCtx = SandeshaUtil
@@ -85,7 +88,9 @@
}
new AxisEngine(context).send(msgCtx);
-
+ MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster ();
+ retransmitterAdjuster.adjustRetransmittion(bean);
+
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
@@ -96,8 +101,8 @@
}
//changing the values of the sent bean.
- bean.setLastSentTime(System.currentTimeMillis());
- bean.setSentCount(bean.getSentCount() + 1);
+ //bean.setLastSentTime(System.currentTimeMillis());
+ //bean.setSentCount(bean.getSentCount() + 1);
//update if resend=true otherwise delete. (reSend=false
// means
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org