You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2007/02/02 20:29:58 UTC

svn commit: r502698 - in /webservices/sandesha/trunk/java: ./ config/ src/org/apache/sandesha2/ src/org/apache/sandesha2/handlers/ src/org/apache/sandesha2/i18n/ src/org/apache/sandesha2/msgprocessors/ src/org/apache/sandesha2/policy/ src/org/apache/sa...

Author: mlovett
Date: Fri Feb  2 11:29:56 2007
New Revision: 502698

URL: http://svn.apache.org/viewvc?view=rev&rev=502698
Log:
Add a config option to use message serialization, see SANDESHA2-70

Added:
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java   (with props)
Modified:
    webservices/sandesha/trunk/java/config/module.xml
    webservices/sandesha/trunk/java/maven.xml
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
    webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java

Modified: webservices/sandesha/trunk/java/config/module.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/module.xml?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/config/module.xml (original)
+++ webservices/sandesha/trunk/java/config/module.xml Fri Feb  2 11:29:56 2007
@@ -116,6 +116,7 @@
 				  <sandesha2:UseRMAnonURI>true</sandesha2:UseRMAnonURI>
 				</sandesha2:MakeConnection>
 				
+				<!-- <sandesha2:UseMessageSerialization>true</sandesha2:UseMessageSerialization> -->
 			</wsp:Policy>
 		</sandesha2:RMAssertion>
 	</wsp:Policy>

Modified: webservices/sandesha/trunk/java/maven.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/maven.xml?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/maven.xml (original)
+++ webservices/sandesha/trunk/java/maven.xml Fri Feb  2 11:29:56 2007
@@ -194,10 +194,41 @@
 		<delete dir="${build.temp.dir}"/>
 	</goal>
 
+	<goal name="serialize:create" prereqs="server:create,client:create">
+		<ant:property name="serialize.client.path" value="${build.repo.dir}/serialize-client"/>
+		<ant:property name="serialize.server.path" value="${build.repo.dir}/serialize-server"/>
+         
+		<ant:property name="serialize.temp.path" value="${build.temp.dir}/serialize" />
+		<ant:mkdir dir="${serialize.temp.path}" />
+				
+		<copy todir="${serialize.client.path}">
+		  <fileset dir="${build.repo.dir}/client">
+		    <exclude name="modules/${mar.name}"/>
+		  </fileset>
+		</copy>
+
+		<copy todir="${serialize.server.path}">
+		  <fileset dir="${build.repo.dir}/server">
+		    <exclude name="modules/${mar.name}"/>
+		  </fileset>
+		</copy>
+
+		<!-- Switch on serialization in the module.xml file -->
+		<ant:unjar src="${maven.build.dir}/${mar.name}" dest="${serialize.temp.path}"/>
+		<ant:replace file="${serialize.temp.path}/META-INF/module.xml" 
+		  token="&lt;!-- &lt;sandesha2:UseMessageSerialization>true&lt;/sandesha2:UseMessageSerialization> -->"
+		  value="&lt;sandesha2:UseMessageSerialization>true&lt;/sandesha2:UseMessageSerialization>"/>
+		<ant:jar jarfile="${serialize.client.path}/modules/${test.module.name}.mar" basedir="${serialize.temp.path}"/>
+		<ant:jar jarfile="${serialize.server.path}/modules/${test.module.name}.mar" basedir="${serialize.temp.path}"/>
+		
+		<delete dir="${build.temp.dir}"/>
+	</goal>
+
     <goal name="repo:create">
         <attainGoal name="server:create"/>
         <attainGoal name="client:create"/>
         <attainGoal name="secure:create"/>
+        <attainGoal name="serialize:create"/>
     </goal>	
     
 	<goal name="server:create" prereqs="mar,sample:create">
@@ -304,7 +335,7 @@
         <ant:copy file="${build.interop.dir}/${interop.service.aar.name}" toDir="${build.repo.dir}/server/services"  overwrite="true"/>
 
         <ant:copy file="${dir.interop}/conf/sec-services.xml" toFile="${dir.interop.service.temp}/META-INF/services.xml" overwrite="true"/>
-        <ant:copy file="${dir.interop}/conf/SecRMInteropService.wsdl" toFile="${dir.interop.service.temp}/META-INF/RMInteropService.wsdl" overwrite="true"/>
+        <ant:copy file="${dir.interop}/conf/SecRMInteropService.wsdl" toFile="${dir.interop.service.temp}/META-INF/RMInteropService.wsdl" overwrite="true"/>
 		<ant:copy file="${dir.interop}/conf/store.jks" toFile="${dir.interop.service.temp}/store.jks" overwrite="true"/>
         <ant:jar jarfile="${build.interop.dir}/${interop.sec.service.aar.name}" basedir="${dir.interop.service.temp}"  overwrite="true"/>
         <ant:copy file="${build.interop.dir}/${interop.sec.service.aar.name}" toDir="${build.repo.dir}/server/services"  overwrite="true"/>
@@ -393,8 +424,8 @@
     
         <ant:copy file="target/${jar.name}" todir="${dir.temp.dist.bin}" />
         <ant:copy file="target/${mar.name}" todir="${dir.temp.dist.bin}" />
-        <ant:copy file="target/${client.jar.name}" todir="${dir.temp.dist.bin}" />
-        <ant:copy file="target/${policy.jar.name}" todir="${dir.temp.dist.bin}" />
+        <ant:copy file="target/${client.jar.name}" todir="${dir.temp.dist.bin}" />
+        <ant:copy file="target/${policy.jar.name}" todir="${dir.temp.dist.bin}" />
                 
        <!-- <ant:copy file="${dir.config}/sandesha2.properties" todir="${dir.temp.dist.bin}" /> -->
         <ant:copy file="${apache.license.file}" todir="${dir.temp.dist.bin}" />

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Fri Feb  2 11:29:56 2007
@@ -355,6 +355,8 @@
 		
 		String EnableRMAnonURI = "EnableRMAnonURI";
 		
+		String UseMessageSerialization = "UseMessageSerialization";
+		
 		public interface DefaultValues {
 			
 			int RetransmissionInterval = 6000;
@@ -386,6 +388,8 @@
 			boolean EnableMakeConnection = true;
 			
 			boolean EnableRMAnonURI = true;
+			
+			boolean UseMessageSerialization = false;
 		}
 	}
 	
@@ -528,6 +532,7 @@
         public static final String ELEM_MAKE_CONNECTION = "MakeConnection";
         public static final String ELEM_ENABLED = "Enabled";
         public static final String ELEM_USE_RM_ANON_URI = "UseRMAnonURI";
+        public static final String ELEM_USE_SERIALIZATION = "UseMessageSerialization";
         
         public static final QName Q_ELEM_POLICY = new QName(URI_POLICY_NS, ELEM_POLICY, ATTR_WSP);
         public static final QName Q_ELEM_RMASSERTION = new QName(URI_RM_POLICY_NS, ELEM_RMASSERTION, ATTR_WSRM);
@@ -547,5 +552,6 @@
         public static final QName Q_ELEM_MAKE_CONNECTION = new QName(URI_RM_POLICY_NS, ELEM_MAKE_CONNECTION, ATTR_WSRM);
         public static final QName Q_ELEM_ENABLED = new QName(URI_RM_POLICY_NS, ELEM_ENABLED, ATTR_WSRM);
         public static final QName Q_ELEM_USE_RM_ANON_URI = new QName(URI_RM_POLICY_NS, ELEM_USE_RM_ANON_URI, ATTR_WSRM);
+        public static final QName Q_ELEM_USE_SERIALIZATION = new QName(URI_RM_POLICY_NS, ELEM_USE_SERIALIZATION, ATTR_WSRM);
     }
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Fri Feb  2 11:29:56 2007
@@ -20,7 +20,6 @@
 import org.apache.axiom.soap.SOAPBody;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
@@ -33,17 +32,13 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.msgprocessors.MakeConnectionProcessor;
 import org.apache.sandesha2.msgprocessors.SequenceProcessor;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.Range;
-import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.wsrm.Sequence;
 
@@ -108,48 +103,10 @@
 			transaction = storageManager.getTransaction();
 
 			RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
-			EndpointReference replyTo = msgContext.getReplyTo();
-			String specVersion = rmMessageContext.getRMSpecVersion();
 				
-			boolean duplicateMessage = isDuplicateMessage (rmMessageContext, storageManager);
-
-			//checking weather the Message belongs to the WSRM 1.0 Anonymous InOut scenario.
-			//If so instead of simply dropping duplicates we will have to attach the corresponding response,
-			//as long as the Message has not been acked.			
-			if (duplicateMessage &&
-					(replyTo==null || replyTo.hasAnonymousAddress()) &&
-					(specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
-
-				// Treat the duplicate message as a special kind of poll
-				Sequence sequence = (Sequence) rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-				
-			    SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-			    SenderBean findSenderBean = new SenderBean ();
-			    findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
-			    findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
-			    findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
-			    findSenderBean.setSend(true);
-		
-			    SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
-			    
-			    // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
-			    // processor. This will use this thread to re-send the reply, writing it into the transport.
-			    // As the reply is now written we do not want to continue processing, or suspend, so we abort.
-			    if(replyMessageBean != null) {
-			    	if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
-			    	MakeConnectionProcessor.replyToPoll(rmMessageContext, replyMessageBean, storageManager, false, null);
-			    } else {
-			    	if(log.isDebugEnabled()) log.debug("No matching reply for replayed message");
-			    }
-			    
-			    returnValue = InvocationResponse.ABORT;
-				if (log.isDebugEnabled()) log.debug("Exit: SandeshaGlobalInHandler::invoke " + returnValue);
-				return returnValue;
-			}
-			
 			boolean shouldMessageBeDropped = shouldMessageBeDropped (rmMessageContext, storageManager);
 			
-			if (duplicateMessage || shouldMessageBeDropped) {
+			if (shouldMessageBeDropped) {
 
 				returnValue = InvocationResponse.ABORT; // the msg has been
 														// dropped
@@ -167,11 +124,7 @@
 			}
 
 		} catch (Exception e) {
-			if (log.isDebugEnabled())
-				log.debug("Caught an exception", e);
-			// message should not be sent in a exception situation.
-			msgContext.pause();
-			returnValue = InvocationResponse.SUSPEND;
+			if (log.isDebugEnabled()) log.debug("Caught an exception", e);
 
 			if (transaction != null) {
 				try {
@@ -203,31 +156,6 @@
 			log.debug("Exit: SandeshaGlobalInHandler::invoke " + returnValue);
 		return returnValue;
 	}
-	
-	
-	private boolean isDuplicateMessage (RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault {
-		boolean duplicate = false;
-		
-		if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
-
-			Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-			long msgNo = sequence.getMessageNumber().getMessageNumber();
-			
-			RMDBean rmdBean = 
-				SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequence.getIdentifier().getIdentifier());
-
-			if (rmdBean != null) {
-				RangeString serverCompletedMessages = rmdBean.getServerCompletedMessages();
-	
-				if (serverCompletedMessages.isMessageNumberInRanges(msgNo))
-					duplicate = true;
-			}
-			
-		}
-		
-		return duplicate;
-	}
-
 	
 	private boolean shouldMessageBeDropped(RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault {
 		

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Fri Feb  2 11:29:56 2007
@@ -254,6 +254,8 @@
 	public static final String maximumRetransmissionCountProcessor="maximumRetransmissionCountProcessor";
 	public static final String nullMsgId="nullMsgId";
 	public static final String storageMapNotPresent="storageMapNotPresent";
+	public static final String failedToStoreMessage="failedToStoreMessage";
+	public static final String failedToLoadMessage="failedToLoadMessage";
 	public static final String entryNotPresentForUpdating="entryNotPresentForUpdating";
 	public static final String appMsgIsNull="appMsgIsNull";
 	public static final String invalidMsgNumberList="invalidMsgNumberList";

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Fri Feb  2 11:29:56 2007
@@ -269,6 +269,8 @@
 maximumRetransmissionCountProcessor=MaximumRetransmissionCountProcessor:doAcknowledgementInterval
 nullMsgId=Key (MessageId) is null. Cannot insert.
 storageMapNotPresent=Error: storage Map not present
+failedToStoreMessage=Failed to store message due to exception {0}.
+failedToLoadMessage=Failed to load message due to exception {0}.
 entryNotPresentForUpdating=Entry is not present for updating
 appMsgIsNull=Application message is null
 invalidMsgNumberList=Invalid msg number list

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Fri Feb  2 11:29:56 2007
@@ -17,8 +17,8 @@
 
 package org.apache.sandesha2.msgprocessors;
 
-import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import javax.xml.namespace.QName;
 
@@ -29,9 +29,7 @@
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
 import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2006Constants;
 import org.apache.commons.logging.Log;
@@ -43,6 +41,7 @@
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -56,7 +55,6 @@
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
-import org.apache.sandesha2.wsrm.Nack;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 
 /**
@@ -135,7 +133,6 @@
 		
 		if(log.isDebugEnabled()) log.debug("Got Ack for RM Sequence: " + outSequenceId + ", internalSeqId: " + internalSequenceId);
 		Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges().iterator();
-		Iterator nackIterator = sequenceAck.getNackList().iterator();
 
 		if (FaultManager.checkForUnknownSequence(rmMsgCtx, outSequenceId, storageManager)) {
 			if (log.isDebugEnabled())
@@ -149,106 +146,72 @@
 			return;
 		}
 		
-		SenderBean input = new SenderBean();
-		input.setSend(true);
-		input.setReSend(true);
-		input.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
-		input.setInternalSequenceID(internalSequenceId);
-		Collection retransmitterEntriesOfSequence = retransmitterMgr.find(input);
-		
 		String replyToAddress = rmsBean.getReplyToEPR();
 		EndpointReference replyTo = new EndpointReference (replyToAddress);
-		boolean anonReplyTo = false;
-		if (replyTo.hasAnonymousAddress())
-			anonReplyTo = true;
+		boolean anonReplyTo = replyTo.hasAnonymousAddress();
 		
 		String rmVersion = rmMsgCtx.getRMSpecVersion();
 		
-		boolean syncResponseExpected = false;
-		RangeString ackedMessagesRanges = new RangeString(); //keep track of the ranges in the ack msgs
+		// Compare the clientCompletedMessages with the range we just got, to work out if there
+		// is any new information in this ack message
+		RangeString completedMessages = rmsBean.getClientCompletedMessages();
 		long numberOfNewMessagesAcked = 0;
 
-		while (ackRangeIterator.hasNext()) {
+		while(ackRangeIterator.hasNext()) {
 			AcknowledgementRange ackRange = (AcknowledgementRange) ackRangeIterator.next();
 			long lower = ackRange.getLowerValue();
 			long upper = ackRange.getUpperValue();
 			
-			if(log.isDebugEnabled()) 
-				log.debug("Ack Range: " + lower + " - " + upper);
-			
-			long rangeStart = lower;
-			long messageNo;
-			for (messageNo = lower; messageNo <= upper; messageNo++) {
-				SenderBean retransmitterBean = getRetransmitterEntry(retransmitterEntriesOfSequence, messageNo);
-
-				if (retransmitterBean != null) {
-					
-					// We've got an Ack for a message that hasn't been sent yet !
-					if (retransmitterBean.getSentCount() == 0) {
-						FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
-								storageManager, SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackInvalidNotSent));
-						if (log.isDebugEnabled())
-							log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack");
-						return;
-					}
-					
-					String storageKey = retransmitterBean.getMessageContextRefKey();
-					
-					boolean syncResponseNeeded = false;
-					if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
-						MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
-						AxisOperation operation = applicationMessage.getAxisOperation();
-						boolean inOutMessage = false;
-						if (operation!=null && 
-								(WSDL20_2004Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern()) ||
-										WSDL20_2006Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern()))) 
-							inOutMessage = true;
+			// Quick check to see if the whole range is covered
+			if(!completedMessages.isRangeCompleted(new Range(lower, upper))) {
+				// We have new info, so take each message one at a time
+				for (long messageNo = lower; messageNo <= upper; messageNo++) {
+					if(!completedMessages.isMessageNumberInRanges(messageNo)) {
+						// We have a new message to consider
+						numberOfNewMessagesAcked++;
+						completedMessages.addRange(new Range(messageNo, messageNo));
+
+						SenderBean matcher = new SenderBean();
+						matcher.setSequenceID(outSequenceId);
+						matcher.setMessageNumber(messageNo);
+						
+						SenderBean retransmitterBean = retransmitterMgr.findUnique(matcher);
+						if (retransmitterBean != null) {
+							// Check we haven't got an Ack for a message that hasn't been sent yet !
+							if (retransmitterBean.getSentCount() == 0) {
+								FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
+										storageManager, SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackInvalidNotSent));
+								if (log.isDebugEnabled())
+									log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack");
+								return;
+							}
+							
+							String storageKey = retransmitterBean.getMessageContextRefKey();
 							
-						if (inOutMessage) {	
-							OperationContext operationContext = applicationMessage.getOperationContext();
-							if (operationContext!=null) {
-								MessageContext responseMessage = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-								if (responseMessage==null) {
+							boolean syncResponseNeeded = false;
+							if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
+								MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
+								AxisOperation operation = applicationMessage.getAxisOperation();
+								if (operation!=null && 
+										(WSDL20_2004Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern()) ||
+												WSDL20_2006Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern()))) 
 									syncResponseNeeded = true;
-									
-									//adding upto the current messageNo
-									//current one will not be considered as we need to get an sync response from an repeat of this.
-									if (rangeStart<messageNo) {
-										ackedMessagesRanges.addRange(new Range(rangeStart, messageNo-1)); 
-										rangeStart = messageNo+1;
-									}
-								}
 							}
-						}
-					}
 
- 					if (!syncResponseNeeded) {
-						// removing the application message from the storage.
-						retransmitterMgr.delete(retransmitterBean.getMessageID());
-						storageManager.removeMessageContext(storageKey);
-						numberOfNewMessagesAcked++;
-					} else {
-						//sync response is needed at least for one message, this should stop the termination.
-						syncResponseExpected = true;
+							if (!syncResponseNeeded) {
+								// removing the application message from the storage.
+								retransmitterMgr.delete(retransmitterBean.getMessageID());
+								storageManager.removeMessageContext(storageKey);
+							}
+						}
 					}
 				}
 			}
-			
-			if (rangeStart<=upper)
-				ackedMessagesRanges.addRange(new Range (rangeStart, upper));
-			
 		}
 
 		// updating the last activated time of the sequence.
 		rmsBean.setLastActivatedTime(System.currentTimeMillis());
 
-		while (nackIterator.hasNext()) {
-			Nack nack = (Nack) nackIterator.next();
-			long msgNo = nack.getNackNumber();
-
-			// TODO - Process Nack
-		}
-		
 		//adding a MakeConnection for the response sequence if needed.
 		if (rmsBean.getOfferedSequence() != null) {
 
@@ -260,75 +223,25 @@
 			
 		}
 
-		// setting acked message date.
-		// TODO add details specific to each message.
-		
 		// We overwrite the previous client completed message ranges with the
 		// latest view, but only if it is an update i.e. contained a new
 		// ack range (which is because we do not previous acks arriving late 
 		// to break us)
 		if (numberOfNewMessagesAcked>0) {
-		  rmsBean.setClientCompletedMessages(ackedMessagesRanges);
-			long noOfMsgsAcked = 
-				rmsBean.getNumberOfMessagesAcked() + numberOfNewMessagesAcked;
+			rmsBean.setClientCompletedMessages(completedMessages);
+			long noOfMsgsAcked = rmsBean.getNumberOfMessagesAcked() + numberOfNewMessagesAcked;
 			rmsBean.setNumberOfMessagesAcked(noOfMsgsAcked);
 		}
 		
-		long lastOutMessage = rmsBean.getLastOutMessage ();
-		
 		// Update the RMSBean
 		storageManager.getRMSBeanMgr().update(rmsBean);
 
-		if (lastOutMessage > 0) {
-			boolean complete = AcknowledgementManager.verifySequenceCompletion(sequenceAck
-					.getAcknowledgementRanges().iterator(), lastOutMessage);
-
-			
-			//If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
-			//received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
-			boolean pauseTerminationForCS = false;
-			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals( rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
-				pauseTerminationForCS = true;
-				
-				RMDBean findBean = new RMDBean ();
-				findBean.setPollingMode(true);
-				
-				RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
-				Iterator rmdBeanIteratort = rmdBeanMgr.find(findBean).iterator();
-				while (rmdBeanIteratort.hasNext()) {
-					RMDBean rmdBean = (RMDBean) rmdBeanIteratort.next();
-					String toAddress = rmdBean.getToAddress();
-					if (toAddress!=null && toAddress.equals(replyToAddress)) {
-						pauseTerminationForCS = false;
-						break;
-					}
-				}
-				
-				if (pauseTerminationForCS) {
-					rmsBean.setTerminationPauserForCS(true);
-					storageManager.getRMSBeanMgr().update(rmsBean);
-				}
-			}
-			
-			
-			if (complete && !pauseTerminationForCS && !syncResponseExpected && !rmsBean.isTerminateAdded()) {
-				TerminateManager.addTerminateSequenceMessage(rmMsgCtx, internalSequenceId, outSequenceId, storageManager);
-			}
-			
-		}
+		// Try and terminate the sequence
+		TerminateManager.checkAndTerminate(rmMsgCtx, storageManager, rmsBean);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: AcknowledgementProcessor::processAckHeader");
 	}
 
-	private SenderBean getRetransmitterEntry(Collection collection, long msgNo) {
-		Iterator it = collection.iterator();
-		while (it.hasNext()) {
-			SenderBean bean = (SenderBean) it.next();
-			if (bean.getMessageNumber() == msgNo)
-				return bean;
-		}
 
-		return null;
-	}
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri Feb  2 11:29:56 2007
@@ -225,28 +225,25 @@
 	
 				RMSBean findBean = new RMSBean ();
 				findBean.setReplyToEPR(toAddress);
+				findBean.setTerminationPauserForCS(true);
 				
 				//TODO recheck
 				RMSBean rmsBean = storageManager.getRMSBeanMgr().findUnique(findBean);
-				if (rmsBean!=null) {
-					
-					if (rmsBean.isTerminationPauserForCS()) {
-						//AckManager hs not done the termination. Do the termination here.
-						
-						MessageContext requestSideRefMessage = storageManager.retrieveMessageContext(rmsBean.getReferenceMessageStoreKey(),context);
-						if (requestSideRefMessage==null) {
-							FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, 
-									SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getInternalSequenceID()),
-									new Exception());						
-							// Return false if an Exception hasn't been thrown.
-							if (log.isDebugEnabled())
-								log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
-							return false;
-						}
-						
-						RMMsgContext requestSideRefRMMessage = MsgInitializer.initializeMessage(requestSideRefMessage);
-						TerminateManager.addTerminateSequenceMessage(requestSideRefRMMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+				if (rmsBean!=null) {					
+					//AckManager hs not done the termination. Do the termination here.
+					MessageContext requestSideRefMessage = storageManager.retrieveMessageContext(rmsBean.getReferenceMessageStoreKey(),context);
+					if (requestSideRefMessage==null) {
+						FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg, 
+								SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getInternalSequenceID()),
+								new Exception());						
+						// Return false if an Exception hasn't been thrown.
+						if (log.isDebugEnabled())
+							log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);				
+						return false;
 					}
+					
+					RMMsgContext requestSideRefRMMessage = MsgInitializer.initializeMessage(requestSideRefMessage);
+					TerminateManager.addTerminateSequenceMessage(requestSideRefRMMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
 				}
 			}
 				

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Fri Feb  2 11:29:56 2007
@@ -9,6 +9,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.TransportOutDescription;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -117,10 +118,12 @@
 			String namespace)
 	throws AxisFault
 	{
+		if(log.isDebugEnabled()) log.debug("Entry: MakeConnectionProcessor::replyToPoll");
 		TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
 		if (transportOut==null) {
 			String message = SandeshaMessageHelper.getMessage(
 					SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
+			if(log.isDebugEnabled()) log.debug(message);
 			throw new SandeshaException (message);
 		}
 			
@@ -128,6 +131,7 @@
 		MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,pollMessage.getConfigurationContext());
 		if (returnMessage==null) {
 			String message = "Cannot find the message stored with the key:" + messageStorageKey;
+			if(log.isDebugEnabled()) log.debug(message);
 			throw new SandeshaException (message);
 		}
 		
@@ -138,6 +142,12 @@
 		
 		// Link the response to the request
 		OperationContext context = pollMessage.getMessageContext().getOperationContext();
+		if(context == null) {
+			AxisOperation oldOperation = returnMessage.getAxisOperation();
+			context = new OperationContext(oldOperation);
+			context.addMessageContext(pollMessage.getMessageContext());
+			pollMessage.getMessageContext().setOperationContext(context);
+		}
 		context.addMessageContext(returnMessage);
 		returnMessage.setOperationContext(context);
 		
@@ -145,12 +155,13 @@
 		
 		//running the MakeConnection through a SenderWorker.
 		//This will allow Sandesha2 to consider both of following senarios equally.
-		//	1. A message being sent by the Sender thread.
+		//  1. A message being sent by the Sender thread.
 		//  2. A message being sent as a reply to an MakeConnection.
 		SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage);
 		worker.setMessage(returnRMMsg);
-
 		worker.run();
+		
+		if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
 	}
 	
 	private static void addMessagePendingHeader (MessageContext returnMessage, String namespace) throws SandeshaException {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Fri Feb  2 11:29:56 2007
@@ -24,6 +24,7 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.engine.Handler.InvocationResponse;
@@ -41,13 +42,17 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.InvokerBean;
 import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
 import org.apache.sandesha2.util.FaultManager;
 import org.apache.sandesha2.util.Range;
 import org.apache.sandesha2.util.RangeString;
 import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.wsrm.Sequence;
 
 /**
@@ -164,8 +169,6 @@
 		}
 		
 		EndpointReference replyTo = rmMsgCtx.getReplyTo();
-		String mep = msgCtx.getAxisOperation().getMessageExchangePattern();
-		
 		String key = SandeshaUtil.getUUID(); // key to store the message.
 		// updating the Highest_In_Msg_No property which gives the highest
 		// message number retrieved from this sequence.
@@ -191,10 +194,47 @@
 		boolean msgNoPresentInList = 
 			serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
 		
+		String specVersion = rmMsgCtx.getRMSpecVersion();
 		if (msgNoPresentInList
 				&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
-			// this is a duplicate message and the invocation type is
-			// EXACTLY_ONCE.
+			// this is a duplicate message and the invocation type is EXACTLY_ONCE. We try to return
+			// ack messages at this point, as if someone is sending duplicates then they may have
+			// missed earlier acks. We also have special processing for sync 2-way with RM 1.0
+			if((replyTo==null || replyTo.hasAnonymousAddress()) &&
+			   (specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
+
+			    SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+			    SenderBean findSenderBean = new SenderBean ();
+			    findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+			    findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+			    findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+			    findSenderBean.setSend(true);
+		
+			    SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
+			    
+			    // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+			    // processor. This will use this thread to re-send the reply, writing it into the transport.
+			    // As the reply is now written we do not want to continue processing, or suspend, so we abort.
+			    if(replyMessageBean != null) {
+			    	if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
+			    	MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+					result = InvocationResponse.ABORT;
+					if (log.isDebugEnabled())
+						log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
+					return result;
+			    }
+		    }
+			EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
+			if (acksTo.hasAnonymousAddress()) {
+				RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, storageManager,false,true);
+				msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+				AcknowledgementManager.sendAckNow(ackRMMsgContext);
+				result = InvocationResponse.ABORT;
+				if (log.isDebugEnabled())
+					log.debug("Exit: SequenceProcessor::processReliableMessage, acking duplicate message: " + result);
+				return result;
+			}
+			
 			result = InvocationResponse.ABORT;
 			if (log.isDebugEnabled())
 				log.debug("Exit: SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
@@ -208,6 +248,29 @@
 		
 		// Update the RMD bean
 		mgr.update(bean);
+		
+		// If we are doing sync 2-way over WSRM 1.0, then we may just have received one of
+		// the reply messages that we were looking for. If so we can remove the matching sender bean.
+		int mep = msgCtx.getAxisOperation().getAxisSpecifMEPConstant();
+		if(specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0) &&
+				mep == WSDL20_2004Constants.MEP_CONSTANT_OUT_IN) {
+			RelatesTo relatesTo = msgCtx.getRelatesTo();
+			if(relatesTo != null) {
+				String messageId = relatesTo.getValue();
+				SenderBean matcher = new SenderBean();
+				matcher.setMessageID(messageId);
+				SenderBean sender = storageManager.getSenderBeanMgr().findUnique(matcher);
+				if(sender != null) {
+					if(log.isDebugEnabled()) log.debug("Deleting sender for sync-2-way message");
+					storageManager.removeMessageContext(sender.getMessageContextRefKey());
+					storageManager.getSenderBeanMgr().delete(messageId);
+					
+					// Try and terminate the corresponding outbound sequence
+					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sender.getSequenceID());
+					TerminateManager.checkAndTerminate(rmMsgCtx, storageManager, rmsBean);
+				}
+			}
+		}
 
 		// inorder invocation is still a global property
 		boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
@@ -226,7 +289,7 @@
 //			add an ack entry here
 		
 		boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) ||
-									WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep);
+									WSDL20_2004Constants.MEP_CONSTANT_IN_ONLY == mep;
 		EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
 		if (acksTo.hasAnonymousAddress() && backchannelFree) {
 			Object responseWritten = msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java Fri Feb  2 11:29:56 2007
@@ -63,6 +63,8 @@
     private boolean enableMakeConnection;
     
     private boolean enableRMAnonURI;
+    
+    private boolean useMessageSerialization;
 
     public void setInactiveTimeoutInterval(long value, String measure) {
         long timeOut = -1;
@@ -270,6 +272,11 @@
         // </wsrm:MakeConnection>
         writer.writeEndElement();
         
+        // <wsrm:UseMessageSerialization />
+        writer.writeStartElement(prefix, Sandesha2Constants.Assertions.Q_ELEM_USE_SERIALIZATION.getLocalPart(), namespaceURI);
+        writer.writeCharacters(Boolean.toString(isUseMessageSerialization()));
+        writer.writeEndElement();
+
         // </wsp:Policy>
         writer.writeEndElement();
 
@@ -334,8 +341,17 @@
 		this.enableRMAnonURI = enableRMAnonURI;
 	}
 
+	public boolean isUseMessageSerialization() {
+		return useMessageSerialization;
+	}
+
+	public void setUseMessageSerialization(boolean useMessageSerialization) {
+		this.useMessageSerialization = useMessageSerialization;
+	}    
+
 	public boolean equal(PolicyComponent policyComponent) {
         // TODO
         return false;
-    }    
+    }
+
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java Fri Feb  2 11:29:56 2007
@@ -148,6 +148,9 @@
                 		propertyBean.setEnableRMAnonURI(Boolean.valueOf(data).booleanValue());
                 	}
                 }
+            } else if (Sandesha2Constants.Assertions.ELEM_USE_SERIALIZATION.equals(name)) {
+            	String value = element.getText().trim();
+            	propertyBean.setUseMessageSerialization(Boolean.valueOf(value).booleanValue());
             }
         }
     }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java Fri Feb  2 11:29:56 2007
@@ -192,6 +192,9 @@
 		
 		else if(bean.getOutOfOrderRanges() != null && !bean.getOutOfOrderRanges().equals(this.getOutOfOrderRanges()))
 			equal = false;
+
+		else if(bean.getToAddress() != null && !bean.getToAddress().equals(this.getToAddress()))
+			equal = false;
 		
 		else if ((bean.rmdFlags & NEXT_MSG_NO_FLAG) != 0 && bean.getNextMsgNoToProcess() != this.getNextMsgNoToProcess())
 			equal = false;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Fri Feb  2 11:29:56 2007
@@ -17,18 +17,27 @@
 
 package org.apache.sandesha2.storage.inmemory;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.HashMap;
 
-import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
@@ -48,7 +57,6 @@
 
 	private static InMemoryStorageManager instance = null;
     private final String MESSAGE_MAP_KEY = "Sandesha2MessageMap";
-    private final String ENVELOPE_MAP_KEY = "Sandesha2EnvelopeMap";
     private RMSBeanMgr  rMSBeanMgr = null;
     private RMDBeanMgr rMDBeanMgr = null;
     private SenderBeanMgr senderBeanMgr = null;
@@ -56,16 +64,23 @@
     private Sender sender = null;
     private Invoker invoker = null;
     private HashMap transactions = new HashMap();
+    private boolean useSerialization = false;
     
-	public InMemoryStorageManager(ConfigurationContext context) {
+	public InMemoryStorageManager(ConfigurationContext context)
+	throws SandeshaException
+	{
 		super(context);
 		
+		SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
+		useSerialization = policy.isUseMessageSerialization();
+
 		this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
 		this.rMDBeanMgr = new InMemoryRMDBeanMgr (this, context);
 		this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
 		this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
 		this.sender = new Sender();
 		this.invoker = new Invoker();
+		
 	}
 
 	public Transaction getTransaction() {
@@ -155,7 +170,9 @@
 	}
 
 	public static InMemoryStorageManager getInstance(
-			ConfigurationContext context) {
+			ConfigurationContext context)
+	throws SandeshaException
+	{
 		if (instance == null)
 			instance = new InMemoryStorageManager(context);
 
@@ -170,32 +187,55 @@
 			return null;
 		}
 		
-		MessageContext messageContext = (MessageContext) storageMap.get(key);
-		
-		HashMap envMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-		if(envMap==null) {
-			if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::retrieveMessageContext");
-			return null;
-		}
-		
-		//Get hold of the original SOAP envelope
-		SOAPEnvelope envelope = (SOAPEnvelope)envMap.get(key);
-		
-		//Now clone the env and set it in the message context
-		if (envelope!=null) {
-			try {
-				SOAPEnvelope clonedEnvelope = SandeshaUtil.cloneEnvelope(envelope);
-				messageContext.setEnvelope(clonedEnvelope);
-			} catch (AxisFault e) {
-				throw new SandeshaStorageException (e);
+		MessageContext messageContext = null;
+		try {
+			if(useSerialization) {
+				SerializedStorageEntry entry = (SerializedStorageEntry) storageMap.get(key);
+				
+				if(entry != null) {
+					ByteArrayInputStream stream = new ByteArrayInputStream(entry.data);
+					ObjectInputStream is = new ObjectInputStream(stream);
+					messageContext = (MessageContext) is.readObject();
+					messageContext.activate(entry.context);
+
+					OperationContext opCtx = messageContext.getOperationContext();
+					if(opCtx != null) {
+						MessageContext inMsgCtx = opCtx.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+						if(inMsgCtx != null) {
+							inMsgCtx.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, entry.inTransportControl);
+							inMsgCtx.setProperty(MessageContext.TRANSPORT_OUT,               entry.inTransportOut);
+							inMsgCtx.setProperty(Constants.OUT_TRANSPORT_INFO,               entry.inTransportOutInfo);
+						}
+					}
+					
+					messageContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, entry.transportControl);
+					messageContext.setProperty(MessageContext.TRANSPORT_OUT,               entry.transportOut);
+					messageContext.setProperty(Constants.OUT_TRANSPORT_INFO,               entry.transportOutInfo);
+				}
+
+			} else {
+				StorageEntry entry = (StorageEntry) storageMap.get(key);
+				
+				if(entry != null) {
+					messageContext = entry.msgContext;
+					SOAPEnvelope clonedEnvelope = SandeshaUtil.cloneEnvelope(entry.envelope);
+					messageContext.setEnvelope(clonedEnvelope);
+				}
 			}
+		} catch (Exception e) {
+			String message = SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.failedToLoadMessage, e.toString());
+			if(log.isDebugEnabled()) log.debug(message);
+			throw new SandeshaStorageException(message, e);
 		}
-		
+
 		if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::retrieveMessageContext, " + messageContext);
 		return messageContext; 
 	}
 
-	public void storeMessageContext(String key,MessageContext msgContext) {
+	public void storeMessageContext(String key,MessageContext msgContext)
+	throws SandeshaStorageException
+	{
 		if(log.isDebugEnabled()) log.debug("Enter: InMemoryStorageManager::storeMessageContext, key: " + key);
 		HashMap storageMap = (HashMap) getContext().getProperty(MESSAGE_MAP_KEY);
 		
@@ -207,21 +247,46 @@
 		if (key==null)
 		    key = SandeshaUtil.getUUID();
 		
-		storageMap.put(key,msgContext);
-		
-		//Now get hold of the SOAP envelope and store it in the env map
-		HashMap envMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-		
-		if(envMap==null) {
-			envMap = new HashMap ();
-			getContext().setProperty(ENVELOPE_MAP_KEY, envMap);
-		}
-		
-		SOAPEnvelope envelope = msgContext.getEnvelope();
+		try {
+			if(useSerialization) {
+				ByteArrayOutputStream stream = new ByteArrayOutputStream();
+				ObjectOutputStream s = new ObjectOutputStream(stream);
+				s.writeObject(msgContext);
+				s.close();
+				
+				SerializedStorageEntry entry = new SerializedStorageEntry();
+				entry.data = stream.toByteArray();
+				entry.context = msgContext.getConfigurationContext();
+				
+				OperationContext opCtx = msgContext.getOperationContext();
+				if(opCtx != null) {
+					MessageContext inMsgCtx = opCtx.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+					if(inMsgCtx != null) {
+						entry.inTransportControl = inMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+						entry.inTransportOut     = inMsgCtx.getProperty(MessageContext.TRANSPORT_OUT);
+						entry.inTransportOutInfo = inMsgCtx.getProperty(Constants.OUT_TRANSPORT_INFO);
+					}
+				}
+				entry.transportControl = msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+				entry.transportOut     = msgContext.getProperty(MessageContext.TRANSPORT_OUT);
+				entry.transportOutInfo = msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
+				
+				storageMap.put(key, entry);
 
-		//We are storing the original envelope here.
-		//Storing a cloned version will caus HeaderBlocks to loose their setProcessed information.
-		envMap.put(key, envelope);
+			} else {
+				//We are storing the original envelope here.
+				//Storing a cloned version will caus HeaderBlocks to loose their setProcessed information.
+				StorageEntry entry = new StorageEntry();
+				entry.msgContext = msgContext;
+				entry.envelope = msgContext.getEnvelope();
+				storageMap.put(key,entry);
+			}
+		} catch(Exception e) {
+			String message = SandeshaMessageHelper.getMessage(
+					SandeshaMessageKeys.failedToStoreMessage, e.toString());
+			if(log.isDebugEnabled()) log.debug(message);
+			throw new SandeshaStorageException(message, e);
+		}
 		
 		if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::storeMessageContext, key: " + key);
 	}
@@ -236,17 +301,11 @@
 					SandeshaMessageKeys.storageMapNotPresent));
 		}
 		
-		Object oldEntry = storageMap.get(key);
+		Object oldEntry = storageMap.remove(key);
 		if (oldEntry==null)
 			throw new SandeshaStorageException (SandeshaMessageHelper.getMessage(
 					SandeshaMessageKeys.entryNotPresentForUpdating));
 		
-		HashMap envMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-
-		storageMap.remove(key);
-		if (envMap!=null)
-			envMap.remove(key);
-		
 		storeMessageContext(key,msgContext);
 
 		if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::updateMessageContext, key: " + key);
@@ -256,15 +315,10 @@
 		if(log.isDebugEnabled()) log.debug("Enter: InMemoryStorageManager::removeMessageContext, key: " + key);
 
 		HashMap storageMap = (HashMap) getContext().getProperty(MESSAGE_MAP_KEY);
-		HashMap envelopeMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-		
 
 		if (storageMap!=null)
 			storageMap.remove(key);
 		
-		if (envelopeMap!=null)
-			envelopeMap.remove(key);
-		
 		if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::removeMessageContext, key: " + key);
 	}
 	
@@ -272,6 +326,20 @@
 		
 	}
 
+	private class SerializedStorageEntry {
+		byte[]               data;
+		ConfigurationContext context;
+		Object               transportControl;
+		Object               transportOut;
+		Object               transportOutInfo;
+		Object               inTransportControl;
+		Object               inTransportOut;
+		Object               inTransportOutInfo;
+	}
+	private class StorageEntry {
+		MessageContext msgContext;
+		SOAPEnvelope   envelope;
+	}
 }
 
 

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Fri Feb  2 11:29:56 2007
@@ -98,7 +98,7 @@
 					}
 
 					try {
-						if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + other);
+						if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + waitingForTran);
 						bean.wait();
 					} catch(InterruptedException e) {
 						// Do nothing

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Fri Feb  2 11:29:56 2007
@@ -284,29 +284,14 @@
 	
 	
 
-	public static boolean verifySequenceCompletion(Iterator ackRangesIterator, long lastMessageNo) {
+	public static boolean verifySequenceCompletion(RangeString ackRanges, long lastMessageNo) {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
 
-		HashMap startMap = new HashMap();
-
-		while (ackRangesIterator.hasNext()) {
-			AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator.next();
-			startMap.put(new Long(temp.getLowerValue()), temp);
-		}
-
-		long start = 1;
 		boolean result = false;
-		while (!result) {
-			AcknowledgementRange temp = (AcknowledgementRange) startMap.get(new Long(start));
-			if (temp == null) {
-				break;
-			}
-
-			if (temp.getUpperValue() >= lastMessageNo)
-				result = true;
-
-			start = temp.getUpperValue() + 1;
+		Range complete = new Range(1, lastMessageNo);
+		if(ackRanges.isRangeCompleted(complete)) {
+			result = true;
 		}
 
 		if (log.isDebugEnabled())

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java Fri Feb  2 11:29:56 2007
@@ -66,7 +66,7 @@
 		
 		propertyBean.setEnableMakeConnection(Sandesha2Constants.Properties.DefaultValues.EnableMakeConnection);
 		propertyBean.setEnableRMAnonURI(Sandesha2Constants.Properties.DefaultValues.EnableRMAnonURI);
-		
+		propertyBean.setUseMessageSerialization(Sandesha2Constants.Properties.DefaultValues.UseMessageSerialization);
 		return propertyBean;
 	}
 
@@ -115,6 +115,9 @@
 			String enableMakeConnection = properties.getProperty(Sandesha2Constants.Properties.EnableMakeConnection);
 			loadEnableMakeConnection(enableMakeConnection, propertyBean);
 			
+			String useSerlialization = properties.getProperty(Sandesha2Constants.Properties.UseMessageSerialization);
+			loadUseSerialization(useSerlialization, propertyBean);
+			
 			String messageTypesToDrop = properties.getProperty(Sandesha2Constants.Properties.MessageTypesToDrop);
 			loadMessageTypesToDrop(messageTypesToDrop, propertyBean);
 
@@ -173,6 +176,10 @@
 		String enableMakeConnection = (String) enableMakeConnectionParam.getValue();
 		loadEnableMakeConnection(enableMakeConnection, propertyBean);
 
+		Parameter useSerializationParam = desc.getParameter(Sandesha2Constants.Properties.UseMessageSerialization);
+		String useSerialization = (String) useSerializationParam.getValue();
+		loadUseSerialization(useSerialization, propertyBean);
+
 		Parameter messageTypesToDropParam = desc.getParameter(Sandesha2Constants.Properties.MessageTypesToDrop);
 		String messageTypesToDrop = (String) messageTypesToDropParam.getValue();
 		loadMessageTypesToDrop(messageTypesToDrop, propertyBean);
@@ -546,6 +553,19 @@
 				propertyBean.setEnableMakeConnection(true);
 			} else if (enableMakeConnection.equalsIgnoreCase("false")) {
 				propertyBean.setEnableMakeConnection(false);
+			}
+		}
+	}
+
+	private static void loadUseSerialization(String useSerialization, SandeshaPolicyBean propertyBean)
+	throws SandeshaException {
+
+		if (useSerialization != null) {
+			useSerialization = useSerialization.trim();
+			if (useSerialization.equalsIgnoreCase("true")) {
+				propertyBean.setUseMessageSerialization(true);
+			} else if (useSerialization.equalsIgnoreCase("false")) {
+				propertyBean.setUseMessageSerialization(false);
 			}
 		}
 	}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Fri Feb  2 11:29:56 2007
@@ -537,9 +537,6 @@
 				}
 			}
 
-            
-			newMessageContext.setExecutionChain(referenceMessage.getExecutionChain());
-
 			return newMessageContext;
 
 		} catch (AxisFault e) {
@@ -967,44 +964,58 @@
 		
 		MessageContext msgContext = rmMsgContext.getMessageContext();
 		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
-		
-		// message will be stored in the Sandesha2TransportSender
-		msgContext.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
-
-		TransportOutDescription transportOut = msgContext.getTransportOut();
-
-		msgContext.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, transportOut);
-		msgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
 
-		Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
-		msgContext.setTransportOut(sandesha2TransportOutDesc);
+		SandeshaPolicyBean policy = getPropertyBean(msgContext.getAxisOperation());
+		if(policy.isUseMessageSerialization()) {
+			msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_TRUE);
 
- 		// sending the message once through Sandesha2TransportSender.
- 		AxisEngine engine = new AxisEngine(configurationContext);
-
-		if (msgContext.isPaused())
-			engine.resumeSend(msgContext);
-		else {
-			//this invocation has to be a blocking one.
-			
-			Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
-			if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
-				msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
-			
-			engine.send(msgContext);
+			StorageManager store = getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
+			store.storeMessageContext(storageKey, msgContext);
 			
-			msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+		} else {
+			// message will be stored in the Sandesha2TransportSender
+			msgContext.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
+	
+			TransportOutDescription transportOut = msgContext.getTransportOut();
+	
+			msgContext.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, transportOut);
+			msgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+	
+			Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
+			msgContext.setTransportOut(sandesha2TransportOutDesc);
+	
+	 		// sending the message once through Sandesha2TransportSender.
+	 		AxisEngine engine = new AxisEngine(configurationContext);
+	
+			if (msgContext.isPaused())
+				engine.resumeSend(msgContext);
+			else {
+				//this invocation has to be a blocking one.
+				
+				Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+				if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
+					msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+				
+				engine.send(msgContext);
+				
+				msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+			}
 		}
-
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaUtil::executeAndStore");
 	}
 	
-	public static void modifyExecutionChainForStoring (MessageContext message) {
+	public static void modifyExecutionChainForStoring (MessageContext message)
+	throws SandeshaException
+	{
 		
 		Object property = message.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
 		if (property!=null)
 			return; //Phases are already set. Dont hv to redo.
+		
+		SandeshaPolicyBean policy = getPropertyBean(message.getAxisOperation());
+		if(policy.isUseMessageSerialization())
+			return; // No need to mess with the transport when we use message serialization
 		
 		TransportOutDescription transportOutDescription = message.getTransportOut();
 		if (!(transportOutDescription instanceof Sandesha2TransportOutDesc))

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Fri Feb  2 11:29:56 2007
@@ -20,6 +20,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
@@ -33,6 +34,7 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
@@ -57,6 +59,63 @@
 
 	public static HashMap receivingSideCleanMap = new HashMap();
 
+	public static void checkAndTerminate(RMMsgContext relatedMessage, StorageManager storageManager, RMSBean rmsBean)
+	throws SandeshaStorageException, AxisFault {
+		if(log.isDebugEnabled()) log.debug("Entry: TerminateManager::checkAndTerminate");
+
+		long lastOutMessage = rmsBean.getLastOutMessage ();
+
+		if (lastOutMessage > 0 && !rmsBean.isTerminateAdded()) {
+			
+			boolean complete = AcknowledgementManager.verifySequenceCompletion(rmsBean.getClientCompletedMessages(), lastOutMessage);
+			
+			//If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
+			//received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
+			String rmVersion = rmsBean.getRMVersion();
+			String replyToAddress = rmsBean.getReplyToEPR();
+
+			if (complete &&
+					Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+				RMDBean findBean = new RMDBean ();
+				findBean.setPollingMode(true);
+				findBean.setToAddress(replyToAddress);
+
+				RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+				List beans = rmdBeanMgr.find(findBean);
+				System.out.println("Checking replyTo" + replyToAddress);
+				if(beans.isEmpty()) {
+					System.out.println("No beans.");
+					rmsBean.setTerminationPauserForCS(true);
+					storageManager.getRMSBeanMgr().update(rmsBean);
+					complete = false;
+				} else {
+					System.out.println("Matcher:\n" + findBean + "\nFound beans:\n" + beans);
+				}
+			}
+			
+			// If we are doing sync 2-way over WSRM 1.0 then we may need to keep sending messages,
+			// so check to see if all the senders have been removed
+			EndpointReference replyTo = new EndpointReference (replyToAddress);
+			if (complete &&
+					Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && replyTo.hasAnonymousAddress()) {
+				SenderBean matcher = new SenderBean();
+				matcher.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+				matcher.setSequenceID(rmsBean.getSequenceID());
+				
+				List matches = storageManager.getSenderBeanMgr().find(matcher);
+				if(!matches.isEmpty()) complete = false;
+			}
+			
+			if (complete) {
+				addTerminateSequenceMessage(relatedMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+			}
+			
+		}
+
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::checkAndTerminate");
+	}
+	
+	
 	/**
 	 * Called by the receiving side to remove data related to a sequence. e.g.
 	 * After sending the TerminateSequence message. Calling this methods will

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Fri Feb  2 11:29:56 2007
@@ -27,6 +27,7 @@
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -127,12 +128,12 @@
 			// or the message can't go anywhere. If there is nothing here then we leave the
 			// message in the sender queue, and a MakeConnection (or a retransmitted request)
 			// will hopefully pick it up soon.
+			RequestResponseTransport t = null;
 			Boolean makeConnection = (Boolean) msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
 			EndpointReference toEPR = msgCtx.getTo();
 			if(toEPR.hasAnonymousAddress() &&
 				(makeConnection == null || !makeConnection.booleanValue())) {
 
-				RequestResponseTransport t = null;
 				MessageContext inMsg = null;
 				OperationContext op = msgCtx.getOperationContext();
 				if (op != null)
@@ -185,37 +186,49 @@
 			}
 
 			try {
-
-				// had to fully build the SOAP envelope to support
-				// retransmissions.
-				// Otherwise a 'parserAlreadyAccessed' exception could
-				// get thrown in retransmissions.
-				// But this has a performance reduction.
-				msgCtx.getEnvelope().build();
-
-				ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
-				if (retransmittablePhases!=null) {
-					msgCtx.setExecutionChain(retransmittablePhases);
+				AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
+				InvocationResponse response = InvocationResponse.CONTINUE;
+				
+				SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation());
+				if(policy.isUseMessageSerialization()) {
+					if(msgCtx.isPaused()) {
+						if (log.isDebugEnabled())
+							log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
+						msgCtx.setPaused(false);
+						msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+						response = engine.resumeSend(msgCtx);
+					} else {
+						if (log.isDebugEnabled())
+							log.debug("Sending a message : " + msgCtx.getEnvelope().getHeader());
+						msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+						engine.send(msgCtx);  // TODO check if this should return an invocation response
+					}
 				} else {
-					ArrayList emptyExecutionChain = new ArrayList ();
-					msgCtx.setExecutionChain(emptyExecutionChain);
-				}
+					// had to fully build the SOAP envelope to support
+					// retransmissions.
+					// Otherwise a 'parserAlreadyAccessed' exception could
+					// get thrown in retransmissions.
+					// But this has a performance reduction.
+					msgCtx.getEnvelope().build();
+	
+					ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
+					if (retransmittablePhases!=null) {
+						msgCtx.setExecutionChain(retransmittablePhases);
+					} else {
+						ArrayList emptyExecutionChain = new ArrayList ();
+						msgCtx.setExecutionChain(emptyExecutionChain);
+					}
+					
+					msgCtx.setCurrentHandlerIndex(0);
+					msgCtx.setCurrentPhaseIndex(0);
+					msgCtx.setPaused(false);
 				
-				msgCtx.setCurrentHandlerIndex(0);
-				msgCtx.setCurrentPhaseIndex(0);
-				msgCtx.setPaused(false);
-			
-				AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
-				if (log.isDebugEnabled())
-					log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
-				InvocationResponse response = engine.resumeSend(msgCtx);
+					if (log.isDebugEnabled())
+						log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
+					response = engine.resumeSend(msgCtx);
+				}
 				if(log.isDebugEnabled()) log.debug("Engine resume returned " + response);
 				if(response != InvocationResponse.SUSPEND) {
-					RequestResponseTransport t = null;
-					MessageContext inMsg = null;
-					OperationContext op = msgCtx.getOperationContext();
-					if(op != null) inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
-					if(inMsg != null) t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
 					if(t != null) {
 						if(log.isDebugEnabled()) log.debug("Signalling transport in " + t);
 						if(t != null) t.signalResponseReady();

Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java Fri Feb  2 11:29:56 2007
@@ -42,8 +42,7 @@
 	private static boolean serverStarted = false; 
 	private static ConfigurationContext configContext = null;
 
-	private int serverPort = DEFAULT_SERVER_TEST_PORT;
-	private String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+	protected String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
 	
 	protected String repoPath = "target" + File.separator + "repos" + File.separator + "server";
 	protected String axis2_xml = "target" + File.separator + "repos" + File.separator + "server" + File.separator + "server_axis2.xml";

Added: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java?view=auto&rev=502698
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java (added)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java Fri Feb  2 11:29:56 2007
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2007 The Apache Software Foundation.
+ * Copyright 2007 International Business Machines Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sandesha2.scenarios;
+
+import java.io.File;
+
+public class SerializableScenariosTest extends RMScenariosTest {
+
+	public SerializableScenariosTest() {
+		super("SerializableScenariosTest");
+		this.repoPath = "target" + File.separator + "repos" + File.separator + "serialize-server";
+		this.axis2_xml = repoPath + File.separator + "server_axis2.xml";
+		
+		this.repoPathClient = "target" + File.separator + "repos" + File.separator + "serialize-client";
+		this.axis2_xmlClient = repoPathClient + File.separator + "client_axis2.xml";
+	}
+
+}

Propchange: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java Fri Feb  2 11:29:56 2007
@@ -115,7 +115,11 @@
 				}
 			}
 		}
-		if(!foundToken) throw new SandeshaException("Message was not secured with the correct token(s)");
+		if(!foundToken) {
+			SandeshaException e = new SandeshaException("Message was not secured with the correct token(s)");
+			e.printStackTrace(System.err);
+			throw e;
+		}
 
 		log.debug("Exit: UnitTestSecurityManager::checkProofOfPossession");
 	}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org