You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/11/19 18:37:17 UTC
svn commit: r345658 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: Constants.java
msgprocessors/ApplicationMsgProcessor.java
storage/inmemory/InMemoryRetransmitterBeanMgr.java
util/MessageRetransmissionAdjuster.java workers/Sender.java
Author: chamikara
Date: Sat Nov 19 09:36:36 2005
New Revision: 345658
URL: http://svn.apache.org/viewcvs?rev=345658&view=rev
Log:
Async Acks were made to be sent as standalone only after waiting for an given wsp:acknowledgementInterval. Unwanted ack entries will be deleted (for e.g. ack 1-2 (yet to be send) will be deleted when adding ack 1-3)
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Sat Nov 19 09:36:36 2005
@@ -118,8 +118,8 @@
}
public interface WSP {
- long RETRANSMISSION_INTERVAL = 1000;
- long ACKNOWLEDGEMENT_INTERVAL = 3000;
+ long RETRANSMISSION_INTERVAL = 20000;
+ long ACKNOWLEDGEMENT_INTERVAL = 4000;
boolean EXPONENTION_BACKOFF = true;
long INACTIVITY_TIMEOUT_INTERVAL = 5000000;
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Sat Nov 19 09:36:36 2005
@@ -18,6 +18,9 @@
package org.apache.sandesha2.msgprocessors;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
import javax.xml.namespace.QName;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -34,9 +37,11 @@
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.axis2.soap.SOAPFactory;
+import org.apache.derby.tools.sysinfo;
import org.apache.sandesha2.Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
@@ -55,6 +60,8 @@
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.wsdl.WSDLConstants;
+import com.sun.rsasign.p;
+
public class ApplicationMsgProcessor implements MsgProcessor {
private boolean letInvoke = false;
@@ -302,6 +309,8 @@
RMMsgContext ackRMMsgCtx = SandeshaUtil.deepCopy(rmMsgCtx);
MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
ackMsgCtx.setAxisServiceGroup(serviceGroup);
ackMsgCtx.setServiceGroupContext(serviceGroupContext);
ackMsgCtx.setAxisService(service);
@@ -389,7 +398,35 @@
ackBean.setReSend(false);
ackBean.setSend(true);
ackBean.setMessagetype(Constants.MessageTypes.ACK);
-
+
+ //the tempSequenceId value of the retransmitter Table for the messages related to an incoming
+ //sequence is the actual sequence ID - TODO document this.
+ ackBean.setTempSequenceId(sequenceId);
+
+ RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx.getProperty(Constants.WSP.RM_POLICY_BEAN);
+ long ackInterval = Constants.WSP.ACKNOWLEDGEMENT_INTERVAL;
+ if (policyBean!=null) {
+ ackInterval = policyBean.getAcknowledgementInaterval();
+ }
+
+ //Ack will be sent as stand alone, only after the retransmitter interval.
+ long timeToSend = System.currentTimeMillis()+ackInterval;
+ ackBean.setTimeToSend(timeToSend);
+
+
+ //removing old acks.
+ RetransmitterBean findBean = new RetransmitterBean ();
+ findBean.setMessagetype(Constants.MessageTypes.ACK);
+ findBean.setTempSequenceId(sequenceId);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+ while (it.hasNext()) {
+ RetransmitterBean retransmitterBean = (RetransmitterBean) it.next();
+ retransmitterBeanMgr.delete(retransmitterBean.getMessageId());
+ }
+
+
+ //inserting the new ack.
retransmitterBeanMgr.insert(ackBean);
SandeshaUtil.startSenderIfStopped(configCtx);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemoryRetransmitterBeanMgr.java Sat Nov 19 09:36:36 2005
@@ -115,13 +115,9 @@
temp = (RetransmitterBean) iterator.next();
if (temp.isSend()) {
- long timeToSend = temp.getTimeToSend();
-
- int count = temp.getSentCount();
-
+ long timeToSend = temp.getTimeToSend();
long timeNow = System.currentTimeMillis();
- if (count == 0
- || (timeNow >= timeToSend)) {
+ if ((timeNow >= timeToSend)) {
beans.add(temp);
}
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/MessageRetransmissionAdjuster.java Sat Nov 19 09:36:36 2005
@@ -20,6 +20,7 @@
import org.apache.axis2.context.MessageContext;
import org.apache.derby.iapi.sql.dictionary.ConsInfo;
import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.SandeshaDynamicProperties;
import org.apache.sandesha2.policy.RMPolicyBean;
import org.apache.sandesha2.storage.beans.RetransmitterBean;
@@ -43,11 +44,9 @@
RMPolicyBean policyBean = (RMPolicyBean) messageContext.getProperty(Constants.WSP.RM_POLICY_BEAN);
if (policyBean==null){
- return retransmitterBean;
+ policyBean = new SandeshaDynamicProperties().getPolicyBean();
}
- long oldRetransmissionTime = retransmitterBean.getTimeToSend();
-
retransmitterBean.setSentCount(retransmitterBean.getSentCount()+1);
adjustNextRetransmissionTime (retransmitterBean,policyBean);
@@ -65,12 +64,13 @@
long baseInterval = policyBean.getRetransmissionInterval();
- long timeToSendNext;
+ long newInterval = baseInterval;
if (policyBean.isExponentialBackoff()) {
- long newInterval = generateNextExponentialBackedoffDifference (count,baseInterval);
- retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+ newInterval = generateNextExponentialBackedoffDifference (count,baseInterval);
}
+ retransmitterBean.setTimeToSend(lastSentTime+newInterval);
+
return retransmitterBean;
}
@@ -82,7 +82,7 @@
//TODO: Have to change this to be plugable
private long generateNextExponentialBackedoffDifference(int count,long initialInterval) {
long interval = initialInterval;
- for (int i=1;i<=count;i++){
+ for (int i=1;i<count;i++){
interval = interval*2;
}
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=345658&r1=345657&r2=345658&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sat Nov 19 09:36:36 2005
@@ -87,9 +87,18 @@
+ "' message.");
}
- new AxisEngine(context).send(msgCtx);
+ try {
+ new AxisEngine(context).send(msgCtx);
+ }catch (Exception e) {
+ //Exception is sending. retry later
+ System.out.println("Exception thrown in sending...");
+ e.printStackTrace();
+ }
+
MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster ();
retransmitterAdjuster.adjustRetransmittion(bean);
+
+ mgr.update(bean);
if (!msgCtx.isServerSide())
checkForSyncResponses(msgCtx);
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org