You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/11/14 10:37:23 UTC

svn commit: r344090 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ policy/ storage/beans/ storage/inmemory/ util/ workers/

Author: chamikara
Date: Mon Nov 14 01:36:19 2005
New Revision: 344090

URL: http://svn.apache.org/viewcvs?rev=344090&view=rev
Log:
Added exponetian backoff concept - Retransmission interval get twice after every retransmission.
Added a MAXUMIM_RETRANSMISSION_COUNT. Retransmission will happen only this number of times.
A new class MessageRetransmissionAdjuster to set the retransmitterBean after retransmission.

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Mon Nov 14 01:36:19 2005
@@ -116,10 +116,12 @@
 	}
 
 	public interface WSP {
-		long RETRANSMISSION_INTERVAL = 500000;
+		long RETRANSMISSION_INTERVAL = 1000;
 		long ACKNOWLEDGEMENT_INTERVAL = 3000;
-		boolean EXPONENTION_BACKOFF = false;
+		boolean EXPONENTION_BACKOFF = true;
 		long INACTIVITY_TIMEOUT_INTERVAL = 5000000;
+		
+		String RM_POLICY_BEAN = "RMPolicyBean";
 	}
 
 	public interface MessageTypes {
@@ -282,5 +284,7 @@
 	String SANDESHA_DEBUG_MODE = "SandeshaDebugMode";
 	
 	String STORAGE_MANAGER_IMPL = "org.apache.sandesha2.storage.inmemory.InMemoryStorageManager";
+	
+	int MAXIMUM_RETRANSMISSION_ATTEMPTS = 5;
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Mon Nov 14 01:36:19 2005
@@ -51,14 +51,24 @@
     protected Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName());
 	   
 	public void invoke(MessageContext msgContext) throws AxisFault {
-		
+	
 		//try {
+		
+		//Quit the message with minimum processing if not intended for RM.
+		boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage (msgContext);
+		if (!isRMGlobalMessage) {
+			return;
+		}
+		
 		RMMsgContext rmMessageContext = MsgInitializer
 				.initializeMessage(msgContext);
 
+		
 		ConfigurationContext context = rmMessageContext.getMessageContext()
 				.getSystemContext();
 
+		//context.setProperty (Constants.SANDESHA_DEBUG_MODE,"on");
+		
 		Object debug = context.getProperty(Constants.SANDESHA_DEBUG_MODE);
 		if (debug != null && "on".equals(debug)) {
 			System.out.println("DEBUG: SandeshaGlobalInHandler got a '"
@@ -157,8 +167,8 @@
 							receivedMsgsBean.setValue(receivedMsgStr);
 							seqPropMgr.update(receivedMsgsBean);
 								
-							//ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor ();
-							//ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr);
+							ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor ();
+							ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr);
 							
 					}
 				}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Nov 14 01:36:19 2005
@@ -41,6 +41,7 @@
 import org.apache.sandesha2.SandeshaDynamicProperties;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
@@ -50,6 +51,7 @@
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.RMPolicyManager;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
@@ -62,6 +64,7 @@
 import org.apache.sandesha2.wsrm.SequenceOffer;
 import org.apache.wsdl.WSDLConstants;
 
+
 /**
  * 
  * @author chamikara
@@ -134,6 +137,12 @@
 		//Strating the sender.
 		SandeshaUtil.startSenderIfStopped(context);
 
+		
+		//Adding the policy bean
+		RMPolicyBean policyBean = RMPolicyManager.getPolicyBean(rmMsgCtx);
+		rmMsgCtx.setProperty(Constants.WSP.RM_POLICY_BEAN,policyBean);
+		
+		
 		StorageManager storageManager = SandeshaUtil
 				.getSandeshaStorageManager(context);
 
@@ -471,7 +480,7 @@
 				.getMessageContext());
 		RetransmitterBean createSeqEntry = new RetransmitterBean();
 		createSeqEntry.setKey(key);
-		createSeqEntry.setLastSentTime(0);
+		createSeqEntry.setTimeToSend(System.currentTimeMillis());
 		createSeqEntry.setMessageId(createSeqRMMessage.getMessageId());
 		createSeqEntry.setSend(true);
 		retransmitterMgr.insert(createSeqEntry);
@@ -661,7 +670,7 @@
 		String key = SandeshaUtil
 				.storeMessageContext(rmMsg.getMessageContext());
 		appMsgEntry.setKey(key);
-		appMsgEntry.setLastSentTime(0);
+		appMsgEntry.setTimeToSend(System.currentTimeMillis());
 		appMsgEntry.setMessageId(rmMsg.getMessageId());
 		appMsgEntry.setMessageNumber(messageNumber);
 		if (outSequenceBean == null || outSequenceBean.getValue() == null) {

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Nov 14 01:36:19 2005
@@ -41,6 +41,10 @@
 
 	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 
+//		boolean b = true;
+//		if (b)
+//			return;
+		
 		SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
 				.getMessagePart(Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
 		if (sequenceAck == null)
@@ -223,7 +227,7 @@
 		// some delay.
 		//Otherwise this get send before return of the current request (ack).
 		//TODO verify that the time given is correct
-		terminateBean.setLastSentTime(System.currentTimeMillis()
+		terminateBean.setTimeToSend(System.currentTimeMillis()
 				+ Constants.TERMINATE_DELAY);
 
 		terminateBean.setMessageId(terminateRMMessage.getMessageId());

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/policy/RMPolicyBean.java Mon Nov 14 01:36:19 2005
@@ -14,21 +14,26 @@
  * limitations under the License.
  */
 
-package org.apache.sandesha2.policy;
 
 /**
  * @author Sanka Samaranayake (sanka@apache.org)
  */
+package org.apache.sandesha2.policy;
+
+import org.apache.sandesha2.Constants;
 
 public class RMPolicyBean {
-    private long inactiveTimeoutInterval = -1l;
-    private long acknowledgementInterval = -1l;
-    private long retransmissionInterval = -1l;
-    private boolean exponentialBackoff = false;
+    private long inactiveTimeoutInterval = Constants.WSP.INACTIVITY_TIMEOUT_INTERVAL;
+    private long acknowledgementInterval = Constants.WSP.ACKNOWLEDGEMENT_INTERVAL;
+    private long retransmissionInterval = Constants.WSP.RETRANSMISSION_INTERVAL;
+    private boolean exponentialBackoff = Constants.WSP.EXPONENTION_BACKOFF;
     
+    public RMPolicyBean () {
+    	loadValuesFromPropertyFile ();
+    }
     
-    public RMPolicyBean() {
-        
+    private void loadValuesFromPropertyFile () {
+    	//TODO load policy values from the file.
     }
     
     public long getInactiveTimeoutInterval() {
@@ -43,7 +48,7 @@
         return retransmissionInterval;
     }
     
-    public boolean getExponentialBackoff() {
+    public boolean isExponentialBackoff() {
         return exponentialBackoff;
     }
     

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/RetransmitterBean.java Mon Nov 14 01:36:19 2005
@@ -23,7 +23,7 @@
 
 	private String key;
 
-	private long LastSentTime;
+//	private long LastSentTime;
 
 	private boolean Send;
 
@@ -35,15 +35,18 @@
 
 	private boolean reSend = true;
 
+	private long timeToSend = 0;
+	
 	public RetransmitterBean() {
 
 	}
 
-	public RetransmitterBean(String messageId, String key, long lastSentTime,
-			boolean send, String tempSequenceId, long messageNumber) {
+	public RetransmitterBean(String messageId, String key,
+			boolean send,long timeToSend, String tempSequenceId, long messageNumber) {
 		this.messageId = messageId;
 		this.key = key;
-		this.LastSentTime = lastSentTime;
+		//this.LastSentTime = lastSentTime;
+		this.timeToSend = timeToSend;
 		this.Send = send;
 		this.tempSequenceId = tempSequenceId;
 		this.messageNumber = messageNumber;
@@ -57,13 +60,13 @@
 		this.key = key;
 	}
 
-	public long getLastSentTime() {
-		return LastSentTime;
-	}
-
-	public void setLastSentTime(long lastSentTime) {
-		LastSentTime = lastSentTime;
-	}
+//	public long getLastSentTime() {
+//		return LastSentTime;
+//	}
+//
+//	public void setLastSentTime(long lastSentTime) {
+//		LastSentTime = lastSentTime;
+//	}
 
 	public String getMessageId() {
 		return messageId;
@@ -112,5 +115,12 @@
 	public void setReSend(boolean reSend) {
 		this.reSend = reSend;
 	}
-
+	
+	public long getTimeToSend() {
+		return timeToSend;
+	}
+	
+	public void setTimeToSend(long timeToSend) {
+		this.timeToSend = timeToSend;
+	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java Mon Nov 14 01:36:19 2005
@@ -71,6 +71,7 @@
 
 		RetransmitterBean temp;
 		while (iterator.hasNext()) {
+			
 			temp = (RetransmitterBean) iterator.next();
 
 			boolean add = true;
@@ -78,8 +79,8 @@
 			if (bean.getKey() != null && !bean.getKey().equals(temp.getKey()))
 				add = false;
 
-			if (bean.getLastSentTime() > 0
-					&& bean.getLastSentTime() != temp.getLastSentTime())
+			if (bean.getTimeToSend() > 0
+					&& bean.getTimeToSend() != temp.getTimeToSend())
 				add = false;
 
 			if (bean.getMessageId() != null
@@ -110,11 +111,14 @@
 		while (iterator.hasNext()) {
 			temp = (RetransmitterBean) iterator.next();
 			if (temp.isSend()) {
-				long lastSentTime = temp.getLastSentTime();
+				
+				long timeToSend = temp.getTimeToSend();
+				
 				int count = temp.getSentCount();
+				
 				long timeNow = System.currentTimeMillis();
 				if (count == 0
-						|| (timeNow > (lastSentTime + Constants.WSP.RETRANSMISSION_INTERVAL))) {
+						|| (timeNow >= timeToSend)) {
 					beans.add(temp);
 				}
 			}

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=344090&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Mon Nov 14 01:36:19 2005
@@ -0,0 +1,93 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import org.apache.axis2.context.MessageContext;
+import org.apache.derby.iapi.sql.dictionary.ConsInfo;
+import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.policy.RMPolicyBean;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
+
+
+/**
+ * @author chamikara
+ */
+
+public class MessageRetransmissionAdjuster {
+
+	public RetransmitterBean adjustRetransmittion (RetransmitterBean retransmitterBean) {
+		String storedKey = (String) retransmitterBean.getKey();
+		
+		if (storedKey==null)
+			return retransmitterBean;
+		
+		MessageContext messageContext = SandeshaUtil.getStoredMessageContext(storedKey);
+		
+		if (messageContext.getSystemContext()==null)
+			return retransmitterBean;
+		
+		RMPolicyBean policyBean = (RMPolicyBean) messageContext.getProperty(Constants.WSP.RM_POLICY_BEAN);
+		if (policyBean==null){
+			return retransmitterBean;
+		}
+		
+		long oldRetransmissionTime = retransmitterBean.getTimeToSend();
+		
+		retransmitterBean.setSentCount(retransmitterBean.getSentCount()+1);
+		adjustNextRetransmissionTime (retransmitterBean,policyBean);
+		
+		if (retransmitterBean.getSentCount()>=Constants.MAXIMUM_RETRANSMISSION_ATTEMPTS)
+			stopRetransmission (retransmitterBean);
+		
+		return retransmitterBean;
+	}
+	
+	private RetransmitterBean adjustNextRetransmissionTime (RetransmitterBean retransmitterBean,RMPolicyBean policyBean) {
+		
+		long lastSentTime = retransmitterBean.getTimeToSend();
+		
+		int count = retransmitterBean.getSentCount();
+		
+		long baseInterval = policyBean.getRetransmissionInterval();
+		
+		long timeToSendNext;
+		if (policyBean.isExponentialBackoff()) {
+			long newInterval = generateNextExponentialBackedoffDifference (count,baseInterval);
+			retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+		}
+		
+		return retransmitterBean;
+	}
+	
+	private void stopRetransmission (RetransmitterBean bean) {
+		bean.setReSend(false);
+	}
+	
+	
+	//TODO: Have to change this to be plugable
+	private long generateNextExponentialBackedoffDifference(int count,long initialInterval) {
+		long interval = initialInterval;
+		for (int i=1;i<=count;i++){
+			interval = interval*2;
+		}
+		
+		return interval;
+	}
+	
+	
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Mon Nov 14 01:36:19 2005
@@ -38,6 +38,7 @@
 import org.apache.sandesha2.Constants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
@@ -56,6 +57,14 @@
 
 public class RMMsgCreator {
 
+	
+	private static void setUpMessage (MessageContext rmMsgCtx) {
+		//Seting RMPolicyBean
+		RMPolicyBean policyBean = new RMPolicyBean ();
+		rmMsgCtx.setProperty(Constants.WSP.RM_POLICY_BEAN,policyBean);
+		
+	}
+	
 	public static RMMsgContext createCreateSeqMsg(
 			RMMsgContext applicationRMMsg, String tempSequenceId, String acksTo)
 			throws SandeshaException {
@@ -94,6 +103,8 @@
 		createSeqmsgContext.setServiceContextID(applicationMsgContext
 				.getServiceContextID());
 
+		setUpMessage(createSeqmsgContext);
+		
 		String createSeqMsgId = SandeshaUtil.getUUID();
 		try {
 			AxisOperation appMsgOperationDesc = applicationMsgContext
@@ -234,6 +245,8 @@
 		if (terminateMessage == null)
 			throw new SandeshaException("MessageContext is null");
 
+		setUpMessage(terminateMessage);
+		
 		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion ( referenceMessage.getEnvelope()));
 
 		MessageInformationHeaders newMessageInfoHeaders = new MessageInformationHeaders();
@@ -338,11 +351,14 @@
 		outMessage.setWSAAction(Constants.WSRM.Actions.ACTION_CREATE_SEQUENCE_RESPONSE);
 		outMessage.setSoapAction(Constants.WSRM.Actions.SOAP_ACTION_CREATE_SEQUENCE_RESPONSE);
 
+		
 		String newMessageId = SandeshaUtil.getUUID();
 		outMessage.setMessageID(newMessageId);
 
 		outMessage.setEnvelope(envelope);
 
+		setUpMessage(outMessage);
+		
 		RMMsgContext createSeqResponse = null;
 		try {
 			createSeqResponse = MsgInitializer.initializeMessage(outMessage);
@@ -419,6 +435,9 @@
 					.getAxisOperation());
 
 			ackMsgCtx.setOperationContext(ackOpCtx);
+			
+			setUpMessage(ackMsgCtx);
+			
 			ackOpCtx.addMessageContext(ackMsgCtx);
 
 			Sequence reqSequence = (Sequence) applicationRMMsgCtx

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java?rev=344090&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMPolicyManager.java Mon Nov 14 01:36:19 2005
@@ -0,0 +1,30 @@
+/*
+ * Copyright  1999-2004 The Apache Software Foundation.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.sandesha2.util;
+
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.msgreceivers.RMMessageReceiver;
+import org.apache.sandesha2.policy.RMPolicyBean;
+
+public class RMPolicyManager {
+
+	public static RMPolicyBean getPolicyBean (RMMsgContext msgContext) {
+		RMPolicyBean policyBean = new RMPolicyBean ();
+		return policyBean;
+	}
+}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Nov 14 01:36:19 2005
@@ -22,6 +22,8 @@
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
+
+import javax.xml.namespace.QName;
 import javax.xml.stream.XMLInputFactory;
 import javax.xml.stream.XMLStreamReader;
 import org.apache.axis2.AxisFault;
@@ -31,6 +33,7 @@
 import org.apache.axis2.description.TransportInDescription;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.om.OMElement;
 import org.apache.axis2.om.impl.llom.builder.StAXBuilder;
 import org.apache.axis2.om.impl.llom.builder.StAXOMBuilder;
 import org.apache.axis2.soap.SOAP11Constants;
@@ -496,5 +499,24 @@
 			return Constants.SOAPVersion.v1_2;
 		else
 			throw new SandeshaException ("Unknown SOAP version");
+	}
+	
+	public static boolean isRMGlobalMessage (MessageContext msgCtx) {
+		boolean rmGlobalMsg = false;
+		
+		String action = msgCtx.getWSAAction();
+		SOAPEnvelope env = msgCtx.getEnvelope();
+		OMElement sequenceElem = env.getFirstChildWithName(new QName (Constants.WSRM.NS_URI_RM,Constants.WSRM.SEQUENCE));
+		
+		if (sequenceElem!=null)
+			rmGlobalMsg = true;
+		
+		if (Constants.WSRM.Actions.ACTION_SEQUENCE_ACKNOWLEDGEMENT.equals(action))
+			rmGlobalMsg = true;
+		
+		if (Constants.WSRM.Actions.ACTION_TERMINATE_SEQUENCE.equals(action))
+			rmGlobalMsg = true;
+		
+		return rmGlobalMsg;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=344090&r1=344089&r2=344090&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon Nov 14 01:36:19 2005
@@ -29,6 +29,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
 import org.apache.sandesha2.storage.beans.RetransmitterBean;
+import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 
@@ -61,7 +62,9 @@
 						.getRetransmitterBeanMgr();
 				Collection coll = mgr.findMsgsToSend();
 				Iterator iter = coll.iterator();
+				
 				while (iter.hasNext()) {
+					
 					RetransmitterBean bean = (RetransmitterBean) iter.next();
 					String key = (String) bean.getKey();
 					MessageContext msgCtx = SandeshaUtil
@@ -85,7 +88,9 @@
 						}
 						
 						new AxisEngine(context).send(msgCtx);
-
+						MessageRetransmissionAdjuster  retransmitterAdjuster = new MessageRetransmissionAdjuster ();
+						retransmitterAdjuster.adjustRetransmittion(bean);
+						
 						if (!msgCtx.isServerSide())
 							checkForSyncResponses(msgCtx);
 
@@ -96,8 +101,8 @@
 					}
 
 					//changing the values of the sent bean.
-					bean.setLastSentTime(System.currentTimeMillis());
-					bean.setSentCount(bean.getSentCount() + 1);
+					//bean.setLastSentTime(System.currentTimeMillis());
+					//bean.setSentCount(bean.getSentCount() + 1);
 
 					//update if resend=true otherwise delete. (reSend=false
 					// means



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org