You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2005/10/31 06:20:08 UTC

svn commit: r329749 - in /webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/ msgprocessors/ storage/beanmanagers/ storage/beans/ util/ workers/

Author: chamikara
Date: Sun Oct 30 21:19:04 2005
New Revision: 329749

URL: http://svn.apache.org/viewcvs?rev=329749&view=rev
Log:
Added the SandeshsaGlobal handler. This is used to put logic that could be carried out globally (to apply for all services). Because of the current way Axis2 have been implemented, had to make sandesha a global module. The service specific handlers will check a property in the services.xml to detect weather RM is anabled for the service.

Added following loging to the global handler.
1. Eliminating duplicate messages.
2. Setting relatesTo value of ack messages to null (some framesowors (e.g. indigo) tend to send a relatesTo value for the ack which could cause an invalid dispatch).
3. Sending ack messages for duplicate messages if required.

did some corrections in the Storage classes.

Added:
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Modified:
    webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/StorageMapBean.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
    webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Constants.java Sun Oct 30 21:19:04 2005
@@ -17,6 +17,8 @@
 
 package org.apache.sandesha2;
 
+
+
 /**
  * @author Chamikara
  * @author Sanka
@@ -97,7 +99,7 @@
 	}
 
 	public interface WSP {
-		long RETRANSMISSION_INTERVAL = 20000;
+		long RETRANSMISSION_INTERVAL = 15000;
 	}
 
 	public interface MessageTypes {
@@ -228,6 +230,8 @@
 
 	String OUT_HANDLER_NAME = "SandeshaOutHandler";
 	
+	String GLOBAL_IN_HANDLER_NAME= "GlobalInHandler";
+	
 	String SEQUENCE_KEY = "SequenceKey";
 
 	//message context properties
@@ -250,5 +254,7 @@
 	String AcksTo = "AcksToProperty";
 	
 	String OFFERED_SEQUENCE_ID = "OfferedSequenceId";
+	
+	String ACK_PROCSSED = "AckProcessed";
 	
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/RMMsgContext.java Sun Oct 30 21:19:04 2005
@@ -121,6 +121,14 @@
 	public String getMessageId() {
 		return msgContext.getMessageID();
 	}
+	
+	public void setFaultTo (EndpointReference epr) {
+		msgContext.setFaultTo(epr);
+	}
+	
+	public EndpointReference getFaultTo () {
+		return msgContext.getFaultTo();
+	}
 
 	public SOAPEnvelope getSOAPEnvelope() {
 		return msgContext.getEnvelope();
@@ -209,4 +217,7 @@
 		return msgContext.getSystemContext();
 	}
 
+	public void setSOAPAction (String SOAPAction) {
+		msgContext.setSoapAction(SOAPAction);
+	}
 }

Added: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=329749&view=auto
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (added)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Sun Oct 30 21:19:04 2005
@@ -0,0 +1,144 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+/**
+ * @author Chamikara
+ * @author Sanka
+ */
+
+package org.apache.sandesha2.handlers;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.sandesha2.Constants;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.storage.beans.StorageMapBean;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Sequence;
+
+public class SandeshaGlobalInHandler extends AbstractHandler {
+
+	public void invoke(MessageContext msgContext) throws AxisFault {
+		
+		
+		try {
+			RMMsgContext rmMessageContext = MsgInitializer
+					.initializeMessage(msgContext);
+			
+			//Dropping duplicates
+			boolean dropped = dropIfDuplicate (rmMessageContext);
+			if (dropped) {
+				processDroppedMessage (rmMessageContext);
+				return;
+			}
+			
+			//Process if global processing possible. - Currently none
+			if (SandeshaUtil.isGloballyProcessableMessageType(rmMessageContext
+					.getMessageType())) {
+				doGlobalProcessing (rmMessageContext);
+			}
+
+		} catch (SandeshaException e) {
+			throw new AxisFault(e.getMessage());
+		}
+
+	}
+	
+	private boolean dropIfDuplicate (RMMsgContext rmMsgContext) throws SandeshaException {
+		
+		boolean drop = false;
+		
+		if (rmMsgContext.getMessageType()==Constants.MessageTypes.APPLICATION) {
+			Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Constants.MessageParts.SEQUENCE);
+			String sequenceId = null;
+			
+			if (sequence!=null) {
+				sequenceId = sequence.getIdentifier().getIdentifier();
+			}
+			
+			long msgNo = sequence.getMessageNumber().getMessageNumber();
+			
+			if (sequenceId!=null && msgNo>0) {				
+				SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory.getInstance(rmMsgContext.getContext()).getSequencePropretyBeanMgr();
+				SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,Constants.SequenceProperties.RECEIVED_MESSAGES);
+				if (receivedMsgsBean!=null) {
+					String receivedMsgStr = (String) receivedMsgsBean.getValue();
+					ArrayList msgNoArrList = SandeshaUtil.getSplittedMsgNoArraylist(receivedMsgStr);
+					
+					if (msgNoArrList.contains(new Long (msgNo).toString())){
+						drop = true;
+					}
+				}
+			}
+		}
+		
+		if (drop) {
+			rmMsgContext.getMessageContext().setPausedTrue(getName());
+			return true;
+		}
+		
+		return false;
+	}
+	
+	private void processDroppedMessage (RMMsgContext rmMsgContext) throws SandeshaException {
+		if (rmMsgContext.getMessageType()==Constants.MessageTypes.APPLICATION) {
+			Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Constants.MessageParts.SEQUENCE);
+			String sequenceId = null;
+			
+			if (sequence!=null) {
+				sequenceId = sequence.getIdentifier().getIdentifier();
+			}
+			
+			SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory.getInstance(rmMsgContext.getContext()).getSequencePropretyBeanMgr();
+			SequencePropertyBean receivedMsgsBean = seqPropMgr.retrieve(sequenceId,Constants.SequenceProperties.RECEIVED_MESSAGES);
+			String receivedMsgStr = (String) receivedMsgsBean.getValue();
+			
+			ApplicationMsgProcessor ackProcessor = new ApplicationMsgProcessor ();
+			//Even though the duplicate message is dropped, hv to send the ack if needed.
+			ackProcessor.sendAckIfNeeded(rmMsgContext,receivedMsgStr);
+
+
+		}
+	}
+	
+	private void doGlobalProcessing (RMMsgContext rmMsgCtx) throws SandeshaException {
+		switch (rmMsgCtx.getMessageType()) {
+		case Constants.MessageTypes.ACK:
+			rmMsgCtx.setRelatesTo(null);	//Removing the relatesTo part from ackMessageIf present. 
+											//Some Frameworks tend to send this.
+		}
+	}
+	
+	public QName getName () {
+		return new QName (Constants.GLOBAL_IN_HANDLER_NAME);
+	}
+}
\ No newline at end of file

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Sun Oct 30 21:19:04 2005
@@ -75,7 +75,7 @@
 		} catch (SandeshaException ex) {
 			throw new AxisFault("Cant initialize the message");
 		}
-
+		
 		//TODO recheck
 		//continue only if an possible application message
 		if (!(rmMsgCtx.getMessageType() == Constants.MessageTypes.UNKNOWN)) {
@@ -188,11 +188,19 @@
 							.getProperty(Constants.AcksTo);
 					
 					//If acksTo is not anonymous. Start the listner  TODO: verify
-					if (!Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+					if (!Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo) && !serverSide) {
 						String transportIn = (String) context.getProperty(MessageContext.TRANSPORT_IN);
 						if (transportIn==null)
 							transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
 						ListenerManager.makeSureStarted(transportIn,context);
+					}else if (acksTo==null && serverSide) {
+						String incomingSequencId = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(tempSequenceId);
+						SequencePropertyBean bean = seqPropMgr.retrieve(incomingSequencId,Constants.SequenceProperties.REPLY_TO_EPR); 
+						if (bean!=null) {
+							EndpointReference acksToEPR = (EndpointReference) bean.getValue();
+							if (acksToEPR!=null)
+								acksTo = (String) acksToEPR.getAddress();
+						}
 					}
 					
 					addCreateSequenceMessage(rmMsgCtx, tempSequenceId, acksTo);
@@ -294,18 +302,23 @@
 					// ("http://localhost:9070/somethingWorking"));
 
 					//Setting WSA Action if null
-					//TODO: Recheck weather this action is correct
-					if (msgCtx.getWSAAction() == null) {
-						EndpointReference toEPR = msgCtx.getTo();
+					//TODO: Recheck weather this actions are correct
+					EndpointReference toEPR = msgCtx.getTo();
 
-						if (toEPR == null)
-							throw new SandeshaException("To EPR is not found");
+					if (toEPR == null)
+						throw new SandeshaException("To EPR is not found");
 
-						String to = toEPR.getAddress();
-						String operationName = msgCtx.getOperationContext()
-								.getAxisOperation().getName()
-								.getLocalPart();
+					String to = toEPR.getAddress();
+					String operationName = msgCtx.getOperationContext()
+							.getAxisOperation().getName()
+							.getLocalPart();
+					
+					if (msgCtx.getWSAAction() == null) {
 						msgCtx.setWSAAction(to + "/" + operationName);
+					}
+					
+					if (msgCtx.getSoapAction()==null) {
+						msgCtx.setSoapAction("\"" + to+"/" + operationName + "\"");
 					}
 
 					//processing the response

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Sun Oct 30 21:19:04 2005
@@ -55,13 +55,8 @@
 
 public class AcknowledgementProcessor implements MsgProcessor {
 
-//	public static void notifyAllWaitingOnKey () {
-//	
-//		SandeshaOutHandler.key.notifyAll();
-//	}
-	
 	public void processMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
-		
+
 		SequenceAcknowledgement sequenceAck = (SequenceAcknowledgement) rmMsgCtx
 				.getMessagePart(Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
 		if (sequenceAck == null)
@@ -71,32 +66,38 @@
 		if (context == null)
 			throw new SandeshaException("Context is null");
 
+		
+		RetransmitterBeanMgr retransmitterMgr = AbstractBeanMgrFactory
+				.getInstance(context).getRetransmitterBeanMgr();
+		SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory
+				.getInstance(context).getSequencePropretyBeanMgr();
+
 		Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges()
 				.iterator();
-		
+
 		Iterator nackIterator = sequenceAck.getNackList().iterator();
 		String outSequenceId = sequenceAck.getIdentifier().getIdentifier();
 		if (outSequenceId == null || "".equals(outSequenceId))
 			throw new SandeshaException("OutSequenceId is null");
 
-		RetransmitterBeanMgr retransmitterMgr = AbstractBeanMgrFactory
-				.getInstance(context).getRetransmitterBeanMgr();
-		SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory
-				.getInstance(context).getSequencePropretyBeanMgr();
-
 		SequencePropertyBean tempSequenceBean = seqPropMgr.retrieve(
-				outSequenceId,
-				Constants.SequenceProperties.TEMP_SEQUENCE_ID);
-		
-		if (tempSequenceBean == null
-				|| tempSequenceBean.getValue() == null)
-			throw new SandeshaException(
-					"TempSequenceId is not set correctly");
+				outSequenceId, Constants.SequenceProperties.TEMP_SEQUENCE_ID);
 
-		String tempSequenceId = (String) tempSequenceBean.getValue();
-		
+		if (tempSequenceBean == null || tempSequenceBean.getValue() == null)
+			throw new SandeshaException("TempSequenceId is not set correctly");
 
+		String tempSequenceId = (String) tempSequenceBean.getValue();
 
+	
+		
+		//Following happens in the SandeshaGlobal handler
+		rmMsgCtx.getMessageContext().setProperty(Constants.ACK_PROCSSED,"true");
+		
+		//Removing relatesTo - Some WSRM endpoints tend to set relatesTo value for ack messages.
+		//Because of this dispatching may go wrong.
+		//So we set relatesTo value to null for ackMessages. (this happens in the SandeshaGlobal handler)
+		rmMsgCtx.setRelatesTo(null);
+		
 		RetransmitterBean input = new RetransmitterBean();
 		input.setTempSequenceId(tempSequenceId);
 		Collection retransmitterEntriesOfSequence = retransmitterMgr
@@ -125,35 +126,52 @@
 			//TODO - Process Nack
 		}
 
-		//If all messages up to last message have been acknowledged.
-		//Add terminate Sequence message.
-		SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
-				tempSequenceId,
-				Constants.SequenceProperties.LAST_OUT_MESSAGE);
-		if (lastOutMsgBean != null) {
-			Long lastOutMsgNoLng = (Long) lastOutMsgBean.getValue();
-			if (lastOutMsgNoLng == null)
-				throw new SandeshaException(
-						"Invalid object set for the Last Out Message");
-
-			long lastOutMessageNo = lastOutMsgNoLng.longValue();
-			if (lastOutMessageNo <= 0)
-				throw new SandeshaException(
-						"Invalid value set for the last out message");
-
-			boolean complete = SandeshaUtil.verifySequenceCompletion(
-					sequenceAck.getAcknowledgementRanges().iterator(),
-					lastOutMessageNo);
+		
+		
+		
+//		boolean justSendTerminateIfNeeded = false;
+//		String ackProcessed = (String) rmMsgCtx.getProperty(Constants.ACK_PROCSSED);
+//		if (ackProcessed!=null && "true".equals(ackProcessed))
+//			justSendTerminateIfNeeded = true;
 			
-			if (complete) {
-				addTerminateSequenceMessage(rmMsgCtx, outSequenceId,tempSequenceId);
-			}
+		//following get called in the SandesaInHandler
+		//if (justSendTerminateIfNeeded) {
+			//If all messages up to last message have been acknowledged.
+			//Add terminate Sequence message.
+			SequencePropertyBean lastOutMsgBean = seqPropMgr.retrieve(
+					tempSequenceId,
+					Constants.SequenceProperties.LAST_OUT_MESSAGE);
+			if (lastOutMsgBean != null) {
+				Long lastOutMsgNoLng = (Long) lastOutMsgBean.getValue();
+				if (lastOutMsgNoLng == null)
+					throw new SandeshaException(
+							"Invalid object set for the Last Out Message");
+
+				long lastOutMessageNo = lastOutMsgNoLng.longValue();
+				if (lastOutMessageNo <= 0)
+					throw new SandeshaException(
+							"Invalid value set for the last out message");
+
+				boolean complete = SandeshaUtil.verifySequenceCompletion(
+						sequenceAck.getAcknowledgementRanges().iterator(),
+						lastOutMessageNo);
+
+				if (complete) {
+					addTerminateSequenceMessage(rmMsgCtx, outSequenceId,
+							tempSequenceId);
+				}
+				
+				//stopping the progress of the message further.
+				rmMsgCtx.getMessageContext().setPausedTrue(
+						new QName(Constants.IN_HANDLER_NAME));
+				
+				//return;
+			//}
 		}
-
-		int i = 1;
 		
-		//stopping the progress of the message further.
-		rmMsgCtx.getMessageContext().setPausedTrue(new QName (Constants.IN_HANDLER_NAME));
+		int i = 1;
+
+
 	}
 
 	private RetransmitterBean getRetransmitterEntry(Collection collection,
@@ -169,32 +187,41 @@
 	}
 
 	public void addTerminateSequenceMessage(RMMsgContext incomingAckRMMsg,
-			String outSequenceId,String tempSequenceId) throws SandeshaException {
+			String outSequenceId, String tempSequenceId)
+			throws SandeshaException {
 		RMMsgContext terminateRMMessage = RMMsgCreator
 				.createTerminateSequenceMessage(incomingAckRMMsg, outSequenceId);
 
 		//detting addressing headers.
-		SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory.getInstance(
-				incomingAckRMMsg.getContext()).getSequencePropretyBeanMgr();
-		//SequencePropertyBean replyToBean = seqPropMgr.retrieve(tempSequenceId,Constants.SequenceProperties.REPLY_TO_EPR);
-		SequencePropertyBean toBean = seqPropMgr.retrieve(tempSequenceId,Constants.SequenceProperties.TO_EPR);
-//		if (replyToBean==null)
-//			throw new SandeshaException ("ReplyTo property is not set");
-		
-//		EndpointReference replyToEPR = (EndpointReference) replyToBean.getValue();
-//		if (replyToEPR==null)
-//			throw new SandeshaException ("ReplyTo EPR has an invalid value");
-		 
+		SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory
+				.getInstance(incomingAckRMMsg.getContext())
+				.getSequencePropretyBeanMgr();
+		//SequencePropertyBean replyToBean =
+		// seqPropMgr.retrieve(tempSequenceId,Constants.SequenceProperties.REPLY_TO_EPR);
+		SequencePropertyBean toBean = seqPropMgr.retrieve(tempSequenceId,
+				Constants.SequenceProperties.TO_EPR);
+		//		if (replyToBean==null)
+		//			throw new SandeshaException ("ReplyTo property is not set");
+
+		//		EndpointReference replyToEPR = (EndpointReference)
+		// replyToBean.getValue();
+		//		if (replyToEPR==null)
+		//			throw new SandeshaException ("ReplyTo EPR has an invalid value");
+
 		EndpointReference toEPR = (EndpointReference) toBean.getValue();
-		if (toEPR==null)
-			throw new SandeshaException ("To EPR has an invalid value");
-		
-		terminateRMMessage.setTo(new EndpointReference (toEPR.getAddress()));
-		
-		//terminateRMMessage.setFrom(new EndpointReference (replyToEPR.getAddress()));
+		if (toEPR == null)
+			throw new SandeshaException("To EPR has an invalid value");
+
+		terminateRMMessage.setTo(new EndpointReference(toEPR.getAddress()));
+		terminateRMMessage.setFrom(new EndpointReference (Constants.WSA.NS_URI_ANONYMOUS));
+		terminateRMMessage.setFaultTo(new EndpointReference (Constants.WSA.NS_URI_ANONYMOUS));
+		//terminateRMMessage.setFrom(new EndpointReference
+		// (replyToEPR.getAddress()));
 		terminateRMMessage
 				.setWSAAction(Constants.WSRM.Actions.TERMINATE_SEQUENCE);
-
+		terminateRMMessage
+			.setSOAPAction("\"" + Constants.WSRM.Actions.TERMINATE_SEQUENCE + "\"");
+		
 		try {
 			terminateRMMessage.addSOAPEnvelope();
 		} catch (AxisFault e) {
@@ -205,20 +232,23 @@
 				.getMessageContext());
 		RetransmitterBean terminateBean = new RetransmitterBean();
 		terminateBean.setKey(key);
-		
-		//Set a retransmitter lastSentTime so that terminate will be send with some delay.
+
+		//Set a retransmitter lastSentTime so that terminate will be send with
+		// some delay.
 		//Otherwise this get send before return of the current request (ack).
 		//TODO verify that the time given is correct
-		terminateBean.setLastSentTime(System.currentTimeMillis()+Constants.TERMINATE_DELAY);
-		
+		terminateBean.setLastSentTime(System.currentTimeMillis()
+				+ Constants.TERMINATE_DELAY);
+
 		terminateBean.setMessageId(terminateRMMessage.getMessageId());
 		terminateBean.setSend(true);
 		terminateBean.setReSend(false);
 
-		RetransmitterBeanMgr retramsmitterMgr = AbstractBeanMgrFactory.getInstance(
-				incomingAckRMMsg.getContext()).getRetransmitterBeanMgr();
+		RetransmitterBeanMgr retramsmitterMgr = AbstractBeanMgrFactory
+				.getInstance(incomingAckRMMsg.getContext())
+				.getRetransmitterBeanMgr();
 		retramsmitterMgr.insert(terminateBean);
-		
+
 	}
 
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Sun Oct 30 21:19:04 2005
@@ -31,6 +31,13 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.context.ServiceGroupContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.soap.SOAPEnvelope;
 import org.apache.sandesha2.Constants;
@@ -40,9 +47,11 @@
 import org.apache.sandesha2.msgreceivers.RMMessageReceiver;
 import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
 import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RetransmitterBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
 import org.apache.sandesha2.storage.beans.NextMsgBean;
+import org.apache.sandesha2.storage.beans.RetransmitterBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.storage.beans.StorageMapBean;
 import org.apache.sandesha2.util.MsgInitializer;
@@ -51,6 +60,7 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.workers.InOrderInvoker;
+import org.apache.sandesha2.wsrm.AckRequested;
 import org.apache.sandesha2.wsrm.AcknowledgementRange;
 import org.apache.sandesha2.wsrm.Sequence;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
@@ -130,64 +140,7 @@
 		msgsBean.setValue(messagesStr);
 		seqPropMgr.update(msgsBean);
 
-		//Setting the ack depending on AcksTo.
-		//TODO: Stop sending askc for every message.
-		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceId,
-				Constants.SequenceProperties.ACKS_TO_EPR);
-
-		EndpointReference acksTo = (EndpointReference) acksToBean.getValue();
-		String acksToStr = acksTo.getAddress();
-
-		if (acksToStr == null || messagesStr == null)
-			throw new SandeshaException(
-					"Seqeunce properties are not set correctly");
-
-		//if (acksToStr.equals(Constants.WSA.NS_URI_ANONYMOUS)) {
-
-		RMMsgContext ackRMMsgCtx = SandeshaUtil.deepCopy(rmMsgCtx);
-		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-		ackMsgCtx.setServiceGroupContext(msgCtx.getServiceGroupContext());
-		ackMsgCtx.setServiceGroupContextId(msgCtx.getServiceGroupContextId());
-		ackMsgCtx.setServiceContext(msgCtx.getServiceContext());
-		ackMsgCtx.setServiceContextID(msgCtx.getServiceContextID());
-
-		//TODO set a suitable operation description
-		OperationContext ackOpContext = new OperationContext(msgCtx
-				.getAxisOperation());
-
-		try {
-			ackOpContext.addMessageContext(ackMsgCtx);
-		} catch (AxisFault e2) {
-			throw new SandeshaException(e2.getMessage());
-		}
-		ackMsgCtx.setOperationContext(ackOpContext);
-
-		//Set new envelope
-		SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
-				Constants.SOAPVersion.DEFAULT).getDefaultEnvelope();
-		try {
-			ackMsgCtx.setEnvelope(envelope);
-		} catch (AxisFault e3) {
-			throw new SandeshaException(e3.getMessage());
-		}
-
-		//FIXME set acksTo instead of ReplyTo
-		ackMsgCtx.setTo(acksTo);
-		ackMsgCtx.setReplyTo(msgCtx.getTo());
-		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId);
-
-		AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
-				.getSystemContext());
-
-		//set CONTEXT_WRITTEN since acksto is anonymous
-		rmMsgCtx.getMessageContext().getOperationContext().setProperty(
-				org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
-		rmMsgCtx.getMessageContext().setProperty(Constants.ACK_WRITTEN, "true");
-		try {
-			engine.send(ackRMMsgCtx.getMessageContext());
-		} catch (AxisFault e1) {
-			throw new SandeshaException(e1.getMessage());
-		}
+		sendAckIfNeeded(rmMsgCtx,messagesStr);
 
 		//		} else {
 		//			//TODO Add async Ack
@@ -302,5 +255,118 @@
 		}
 
 		return false;
+	}
+	
+	public void sendAckIfNeeded (RMMsgContext rmMsgCtx,String messagesStr) throws SandeshaException {
+		
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		
+		SequencePropertyBeanMgr seqPropMgr = AbstractBeanMgrFactory
+		.getInstance(rmMsgCtx.getContext())
+		.getSequencePropretyBeanMgr();
+		
+		
+		Sequence sequence = (Sequence) rmMsgCtx
+		.getMessagePart(Constants.MessageParts.SEQUENCE);
+		String sequenceId = sequence.getIdentifier().getIdentifier();
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext()
+			.getSystemContext();
+		if (configCtx == null)
+			throw new SandeshaException("Configuration Context is null");
+
+		//Setting the ack depending on AcksTo.
+		//TODO: Stop sending askc for every message.
+		SequencePropertyBean acksToBean = seqPropMgr.retrieve(sequenceId,
+				Constants.SequenceProperties.ACKS_TO_EPR);
+
+		EndpointReference acksTo = (EndpointReference) acksToBean.getValue();
+		String acksToStr = acksTo.getAddress();
+
+		if (acksToStr == null || messagesStr == null)
+			throw new SandeshaException(
+					"Seqeunce properties are not set correctly");
+
+		//if (acksToStr.equals(Constants.WSA.NS_URI_ANONYMOUS)) {
+
+		AxisConfiguration axisConfig = configCtx.getAxisConfiguration();
+		AxisServiceGroup serviceGroup = new AxisServiceGroup (axisConfig);
+		AxisService service = new AxisService (new QName ("RMClientService")); // This is a dummy service.
+		ServiceGroupContext serviceGroupContext = new ServiceGroupContext (configCtx,serviceGroup);
+		ServiceContext serviceContext = new ServiceContext (service,serviceGroupContext);
+		
+
+		RMMsgContext ackRMMsgCtx = SandeshaUtil.deepCopy(rmMsgCtx);
+		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+		
+		ackMsgCtx.setAxisServiceGroup(serviceGroup);
+		ackMsgCtx.setServiceGroupContext(serviceGroupContext);
+		ackMsgCtx.setAxisService(service);
+		ackMsgCtx.setServiceContext(serviceContext);
+		
+		try {
+			AxisOperation ackOperation = AxisOperationFactory.getOperetionDescription(AxisOperationFactory.MEP_CONSTANT_IN_ONLY);
+
+			AxisOperation rmMsgOperation = rmMsgCtx.getMessageContext().getAxisOperation();
+			if (rmMsgOperation!=null) {
+				ArrayList outFlow = rmMsgOperation.getPhasesOutFlow();
+				if (outFlow!=null) {
+					ackOperation.setPhasesOutFlow(outFlow);
+					ackOperation.setPhasesOutFaultFlow(outFlow);
+				}
+			}
+			
+			OperationContext ackOpContext = new OperationContext (ackOperation);
+			ackMsgCtx.setAxisOperation(ackOperation);
+			ackMsgCtx.setOperationContext(ackOpContext);
+			ackOpContext.addMessageContext(ackMsgCtx);
+			ackMsgCtx.setOperationContext(ackOpContext);
+			
+		} catch (AxisFault e) {
+			throw new SandeshaException (e.getMessage());
+		}
+
+		//Set new envelope
+		SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(
+				Constants.SOAPVersion.DEFAULT).getDefaultEnvelope();
+		try {
+			ackMsgCtx.setEnvelope(envelope);
+		} catch (AxisFault e3) {
+			throw new SandeshaException(e3.getMessage());
+		}
+
+		//FIXME set acksTo instead of ReplyTo
+		ackMsgCtx.setTo(acksTo);
+		ackMsgCtx.setReplyTo(msgCtx.getTo());
+		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId);
+
+		if (Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo.getAddress())) {
+			AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext()
+					.getSystemContext());
+
+
+			//set CONTEXT_WRITTEN since acksto is anonymous
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(
+					org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
+			rmMsgCtx.getMessageContext().setProperty(Constants.ACK_WRITTEN, "true");
+			try {
+				engine.send(ackRMMsgCtx.getMessageContext());
+			} catch (AxisFault e1) {
+				throw new SandeshaException(e1.getMessage());
+			}
+		} else {
+			RetransmitterBeanMgr retransmitterBeanMgr = AbstractBeanMgrFactory.getInstance(configCtx).getRetransmitterBeanMgr();
+			
+			String key = SandeshaUtil.storeMessageContext(ackMsgCtx);
+			RetransmitterBean ackBean = new RetransmitterBean ();
+			ackBean.setKey(key);
+			ackBean.setMessageId(ackMsgCtx.getMessageID());
+			ackBean.setReSend(false);
+			ackBean.setSend(true);
+			
+			retransmitterBeanMgr.insert(ackBean);
+			
+			SandeshaUtil.startSenderIfStopped(configCtx);
+		}
+		
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Sun Oct 30 21:19:04 2005
@@ -17,13 +17,22 @@
 
 package org.apache.sandesha2.msgprocessors;
 
+import java.io.SequenceInputStream;
+
 import javax.xml.namespace.QName;
 
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.sandesha2.Constants;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.storage.AbstractBeanMgrFactory;
+import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.StorageMapBeanMgr;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+import org.apache.sandesha2.wsrm.TerminateSequence;
 
 /**
  * @author Chamikara
@@ -45,7 +54,6 @@
 		
 		//Processing the terminate message
 		//TODO Add terminate sequence message logic.
-
 		
 		terminateSeqMsg.setPausedTrue(new QName (Constants.IN_HANDLER_NAME));
 		

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/CreateSeqBeanMgr.java Sun Oct 30 21:19:04 2005
@@ -75,18 +75,37 @@
 
 	public Collection find(CreateSeqBean bean) {
 		ArrayList beans = new ArrayList();
+		if (bean==null)
+			return beans;
+		
 		Iterator iterator = table.values().iterator();
 
 		CreateSeqBean temp;
 		while (iterator.hasNext()) {
 			temp = (CreateSeqBean) iterator.next();
-			if ((bean.getCreateSeqMsgId() != null && bean.getCreateSeqMsgId()
-					.equals(temp.getCreateSeqMsgId()))
-					&& (bean.getSequenceId() != null && bean.getSequenceId()
-							.equals(bean.getSequenceId()))) {
+//			if ((bean.getCreateSeqMsgId() != null && bean.getCreateSeqMsgId()
+//					.equals(temp.getCreateSeqMsgId()))
+//					&& (bean.getSequenceId() != null && bean.getSequenceId()
+//							.equals(bean.getSequenceId()))) {
+//				beans.add(temp);
+//
+//			}
+			
+			boolean equal = true;
+			
+			if (bean.getCreateSeqMsgId()!=null && !bean.getCreateSeqMsgId().equals(temp.getCreateSeqMsgId()))
+				equal = false;
+			
+			if (bean.getSequenceId()!=null && !bean.getSequenceId().equals(temp.getSequenceId()))
+				equal = false;
+			
+			if (bean.getTempSequenceId()!=null && !bean.getTempSequenceId().equals(temp.getTempSequenceId()))
+				equal = false;
+			
+			
+			if (equal)
 				beans.add(temp);
-
-			}
+			
 		}
 		return beans;
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/NextMsgBeanMgr.java Sun Oct 30 21:19:04 2005
@@ -71,20 +71,36 @@
 		ArrayList beans = new ArrayList();
 		Iterator iterator = table.values().iterator();
 
+		if (bean==null)
+			return beans;
+		
 		NextMsgBean temp;
 		while (iterator.hasNext()) {
 			temp = (NextMsgBean) iterator.next();
 
-			if ((bean.getSequenceId() != null && bean.getSequenceId().equals(
-					temp.getSequenceId()))
-					/*
-					 * && (bean.getNextMsgNoToProcess() != null &&
-					 * bean.getNextMsgNoToProcess().equals(temp.getNextMsgNoToProcess()))
-					 */
-					&& (bean.getNextMsgNoToProcess() > 0)) {
-
+//			if ((bean.getSequenceId() != null && bean.getSequenceId().equals(
+//					temp.getSequenceId()))
+//					/*
+//					 * && (bean.getNextMsgNoToProcess() != null &&
+//					 * bean.getNextMsgNoToProcess().equals(temp.getNextMsgNoToProcess()))
+//					 */
+//					&& (bean.getNextMsgNoToProcess() > 0)) {
+//
+//				beans.add(temp);
+//			}
+			
+			
+			boolean equal = true;
+			
+			if (bean.getNextMsgNoToProcess()>0 && bean.getNextMsgNoToProcess()!=temp.getNextMsgNoToProcess())
+				equal = false;
+			
+			if (bean.getSequenceId()!=null && !bean.getSequenceId().equals(temp.getSequenceId()))
+				equal = false;
+			
+			if (equal)
 				beans.add(temp);
-			}
+			
 
 		}
 		return beans;

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/RetransmitterBeanMgr.java Sun Oct 30 21:19:04 2005
@@ -113,8 +113,7 @@
 				int count = temp.getSentCount();
 				long timeNow = System.currentTimeMillis();
 				if (count == 0
-						|| (timeNow > lastSentTime
-								+ Constants.WSP.RETRANSMISSION_INTERVAL)) {
+						|| (timeNow > (lastSentTime + Constants.WSP.RETRANSMISSION_INTERVAL))) {
 					beans.add(temp);
 				}
 			}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beanmanagers/SequencePropertyBeanMgr.java Sun Oct 30 21:19:04 2005
@@ -67,21 +67,40 @@
 
 	public Collection find(SequencePropertyBean bean) {
 		ArrayList beans = new ArrayList();
+		
+		if (bean==null)
+			return beans;
+		
 		Iterator iterator = table.values().iterator();
 		SequencePropertyBean temp;
 
 		while (iterator.hasNext()) {
 			temp = (SequencePropertyBean) iterator.next();
 
-			if ((bean.getSequenceId() != null && bean.getSequenceId().equals(
-					temp.getSequenceId()))
-					&& (bean.getName() != null && bean.getName().equals(
-							temp.getName()))
-					&& (bean.getValue() != null && bean.getValue().equals(
-							temp.getValue()))) {
-
+//			if ((bean.getSequenceId() != null && bean.getSequenceId().equals(
+//					temp.getSequenceId()))
+//					&& (bean.getName() != null && bean.getName().equals(
+//							temp.getName()))
+//					&& (bean.getValue() != null && bean.getValue().equals(
+//							temp.getValue()))) {
+//
+//				beans.add(temp);
+//			}
+			
+			boolean equal = true;
+			
+			if (bean.getSequenceId()!=null && !bean.getSequenceId().equals(temp.getSequenceId()))
+				equal = false;
+			
+			if (bean.getName()!=null && !bean.getName().equals(temp.getName()))
+				equal = false;
+			
+			if (bean.getValue()!=null && !bean.getValue().equals(temp.getValue()))
+				equal = false;
+			
+			if (equal)
 				beans.add(temp);
-			}
+			
 		}
 		return beans;
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/StorageMapBean.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/StorageMapBean.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/StorageMapBean.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/beans/StorageMapBean.java Sun Oct 30 21:19:04 2005
@@ -29,6 +29,8 @@
 	private long MsgNo;
 
 	private String sequenceId;
+	
+	private boolean invoked = false;
 
 	public StorageMapBean() {
 
@@ -83,5 +85,13 @@
 	 */
 	public void setSequenceId(String sequenceId) {
 		this.sequenceId = sequenceId;
+	}
+	
+	public boolean isInvoked() {
+		return invoked;
+	}
+	
+	public void setInvoked(boolean invoked) {
+		this.invoked = invoked;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/RMMsgCreator.java Sun Oct 30 21:19:04 2005
@@ -17,6 +17,9 @@
 
 package org.apache.sandesha2.util;
 
+import java.util.ArrayList;
+
+import javax.xml.namespace.QName;
 import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamWriter;
 
@@ -28,8 +31,15 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.context.ServiceGroupContext;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.AxisOperationFactory;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.AxisServiceGroup;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.axis2.engine.AxisConfigurationImpl;
+import org.apache.axis2.engine.Phase;
 import org.apache.axis2.om.OMAbstractFactory;
 import org.apache.axis2.om.impl.MIMEOutputUtils;
 import org.apache.axis2.soap.SOAPEnvelope;
@@ -109,10 +119,12 @@
 		try {
 			AxisOperation appMsgOperationDesc = applicationMsgContext.getAxisOperation();
 			AxisOperation createSeqOperationDesc = AxisOperationFactory.getOperetionDescription(AxisOperationFactory.MEP_CONSTANT_OUT_IN);
-			createSeqOperationDesc.setPhasesOutFlow(appMsgOperationDesc.getPhasesOutFlow());
-			createSeqOperationDesc.setPhasesOutFaultFlow(appMsgOperationDesc.getPhasesOutFaultFlow());
-			createSeqOperationDesc.setPhasesInFaultFlow(appMsgOperationDesc.getPhasesInFaultFlow());
-			createSeqOperationDesc.setRemainingPhasesInFlow(appMsgOperationDesc.getRemainingPhasesInFlow());
+			if (appMsgOperationDesc!=null) {
+				createSeqOperationDesc.setPhasesOutFlow(appMsgOperationDesc.getPhasesOutFlow());
+				createSeqOperationDesc.setPhasesOutFaultFlow(appMsgOperationDesc.getPhasesOutFaultFlow());
+				createSeqOperationDesc.setPhasesInFaultFlow(appMsgOperationDesc.getPhasesInFaultFlow());
+				createSeqOperationDesc.setRemainingPhasesInFlow(appMsgOperationDesc.getRemainingPhasesInFlow());
+			}
 			
 			createSeqmsgContext.setAxisOperation(createSeqOperationDesc);
 			//TODO set a suitable ope. description
@@ -201,7 +213,7 @@
 		}
 
 		createSeqRMMsg.setAction(Constants.WSRM.ACTION_CREATE_SEQ);
-		
+		createSeqRMMsg.setSOAPAction("\"" + Constants.WSRM.ACTION_CREATE_SEQ + "\"");
 		createSeqRMMsg.setMessageId(createSeqMsgId);
 
 		MessageContext createSeqMsg = createSeqRMMsg.getMessageContext();
@@ -230,30 +242,43 @@
 		terminateMessage.setMessageInformationHeaders(newMessageInfoHeaders);
 		terminateMessage.setMessageID(SandeshaUtil.getUUID());
 		
-		terminateMessage.setServiceGroupContext(referenceMessage
-				.getServiceGroupContext());
-		terminateMessage.setServiceGroupContextId(referenceMessage
-				.getServiceGroupContextId());
-		terminateMessage.setServiceContext(referenceMessage
-				.getServiceContext());
-		terminateMessage.setServiceContextID(referenceMessage
-				.getServiceContextID());
 		
-		terminateMessage.setAxisOperation(referenceMessage.getAxisOperation());
-		OperationContext newOperationCtx = new OperationContext (terminateMessage.getAxisOperation());
+		ConfigurationContext configCtx = referenceMessage.getSystemContext();
+		if (configCtx==null)
+			throw new SandeshaException ("Configuration Context is null");
+		
+		AxisConfiguration axisConfig = configCtx.getAxisConfiguration();
+		AxisServiceGroup serviceGroup = new AxisServiceGroup (axisConfig);
+		AxisService service = new AxisService (new QName ("RMClientService")); // This is a dummy service.
+		ServiceGroupContext serviceGroupContext = new ServiceGroupContext (configCtx,serviceGroup);
+		ServiceContext serviceContext = new ServiceContext (service,serviceGroupContext);
+		
+		terminateMessage.setAxisServiceGroup(serviceGroup);
+		terminateMessage.setServiceGroupContext(serviceGroupContext);
+		terminateMessage.setAxisService(service);
+		terminateMessage.setServiceContext(serviceContext);
+		
 		try {
-			newOperationCtx.addMessageContext(terminateMessage);
+			AxisOperation terminateOperaiton = AxisOperationFactory.getOperetionDescription(AxisOperationFactory.MEP_CONSTANT_IN_ONLY);
+			AxisOperation referenceMsgOperation = referenceMessage.getAxisOperation();
+			if (referenceMsgOperation!=null) {
+				ArrayList outPhases = referenceMsgOperation.getPhasesOutFlow();
+				if (outPhases!=null) {
+					terminateOperaiton.setPhasesOutFlow(outPhases);
+					terminateOperaiton.setPhasesOutFaultFlow(outPhases);
+				}
+			}
+			
+			OperationContext terminateOpContext = new OperationContext (terminateOperaiton);
+			terminateMessage.setAxisOperation(terminateOperaiton);
+			terminateMessage.setOperationContext(terminateOpContext);
+			terminateOpContext.addMessageContext(terminateMessage);
+			terminateMessage.setOperationContext(terminateOpContext);
+			
 		} catch (AxisFault e) {
 			throw new SandeshaException (e.getMessage());
 		}
 		
-		terminateMessage.setOperationContext(newOperationCtx);
-		
-		ConfigurationContext configCtx = terminateMessage.getSystemContext();
-		if (configCtx==null)
-			throw new SandeshaException ("Configuration Context is null");
-		configCtx.registerOperationContext(terminateMessage.getMessageID(),newOperationCtx);
-
 		SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(Constants.SOAPVersion.DEFAULT).getDefaultEnvelope();
 		terminateRMMessage.setSOAPEnvelop(envelope);
 		
@@ -306,7 +331,8 @@
 				Constants.SOAPVersion.DEFAULT).getDefaultEnvelope();
 		response.toOMElement(envelope.getBody());
 		outMessage.setWSAAction(Constants.WSRM.NS_URI_CREATE_SEQ_RESPONSE);
-
+		outMessage.setSoapAction(Constants.WSRM.NS_URI_CREATE_SEQ_RESPONSE);
+		
 		String newMessageId = SandeshaUtil.getUUID();
 		outMessage.setMessageID(newMessageId);
 
@@ -357,6 +383,7 @@
 
 		sequenceAck.toOMElement(envelope.getHeader());
 		applicationMsg.setAction(Constants.WSRM.ACTION_SEQ_ACK);
+		applicationMsg.setSOAPAction(Constants.WSRM.ACTION_SEQ_ACK);
 		applicationMsg.setMessageId(SandeshaUtil.getUUID());
 
 	}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Sun Oct 30 21:19:04 2005
@@ -148,7 +148,7 @@
 		return input;
 	}
 
-	private static long[] getLongArr(String[] strings) {
+	public static long[] getLongArr(String[] strings) {
 		int length = strings.length;
 		long[] longs = new long[length];
 		for (int i = 0; i < length; i++) {
@@ -192,8 +192,8 @@
 			newMessageContext.setProcessingFault(msgCtx.isProcessingFault());
 			newMessageContext.setResponseWritten(msgCtx.isResponseWritten());
 			newMessageContext.setRestThroughPOST(msgCtx.isRestThroughPOST());
-			newMessageContext.setAxisOperation(msgCtx
-					.getAxisOperation());
+			if (msgCtx.getAxisOperation()!=null)
+				newMessageContext.setAxisOperation(msgCtx.getAxisOperation());
 
 			if (msgCtx.getEnvelope() != null)
 				newMessageContext.setEnvelope(msgCtx.getEnvelope());
@@ -411,5 +411,47 @@
 		default:
 			return "Error";	
 		}
+	}
+	
+	public static boolean isGloballyProcessableMessageType (int type) {
+		if (type==Constants.MessageTypes.ACK || type==Constants.MessageTypes.TERMINATE_SEQ) {
+			return true;
+		}
+		
+		return false;
+	}
+	
+	public static boolean isDuplicateDropRequiredMsgType (int rmMessageType) {
+		if (rmMessageType==Constants.MessageTypes.APPLICATION)
+			return true;
+		
+		if (rmMessageType==Constants.MessageTypes.CREATE_SEQ_RESPONSE)
+			return true;
+		
+		return false;
+	}
+	
+	//TODO: correct following to work for long.
+	public static ArrayList getSplittedMsgNoArraylist (String str) {
+		String[] splitted = str.split(",");
+		ArrayList results = new ArrayList ();
+		
+		long count = splitted.length;
+		for (int i=0;i<count;i++) {
+			String s = splitted[i];
+			results.add(s);
+		}
+		
+		return results;
+	}
+	
+	public static String getServerSideIncomingSeqIdFromInternalSeqId (String internalSequenceId) {
+		String incomingSequenceId = internalSequenceId;
+		return incomingSequenceId;
+	}
+	
+	public static String getServerSideInternalSeqIdFromIncomingSeqId (String incomingSequenceId) {
+		String internalSequenceId = incomingSequenceId;
+		return internalSequenceId;
 	}
 }

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Sun Oct 30 21:19:04 2005
@@ -82,6 +82,8 @@
 				Thread.sleep(1000);
 			} catch (InterruptedException ex) {
 				System.out.println("Invoker was Inturrepted....");
+				ex.printStackTrace();
+				System.out.println("End printing Interrupt...");
 			}
 			
 
@@ -127,17 +129,17 @@
 					
 						MessageContext msgToInvoke = SandeshaUtil.getStoredMessageContext(key);
 					
-						//removing the storage map entry.
-						storageMapMgr.delete(key);
-					
 						RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
 						Sequence seq = (Sequence) rmMsg.getMessagePart(Constants.MessageParts.SEQUENCE);
 						long msgNo = seq.getMessageNumber().getMessageNumber();
 					
-						System.out.println("Invoking message number " + msgNo + " of the sequence " + sequenceId);
 						try {
 							//Invoking the message.
 							new AxisEngine (msgToInvoke.getSystemContext()).receive(msgToInvoke);
+							
+							//deleting the message entry.							
+							storageMapMgr.delete(key);
+							
 						} catch (AxisFault e) {
 							throw new SandeshaException (e.getMessage());
 						}

Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=329749&r1=329748&r2=329749&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Sun Oct 30 21:19:04 2005
@@ -17,6 +17,7 @@
 package org.apache.sandesha2.workers;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
@@ -107,6 +108,8 @@
 					e1.printStackTrace();
 				} catch (SandeshaException e2) {
 					e2.printStackTrace();
+				} catch (Exception e3) {
+					e3.printStackTrace();
 				}
 
 				//changing the values of the sent bean.
@@ -127,6 +130,8 @@
 			} catch (InterruptedException e1) {
 				//e1.printStackTrace();
 				System.out.println("Sender was interupted...");
+				e1.printStackTrace();
+				System.out.println("End printing Interrupt...");
 			}
 		}
 
@@ -190,6 +195,13 @@
 			response.setServiceContext(msgCtx.getServiceContext());
 			response.setAxisService (msgCtx.getAxisService());
 			response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());
+			
+			//setting the in-flow.
+			//ArrayList inPhaseHandlers = response.getAxisOperation().getRemainingPhasesInFlow();
+			/*if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
+				ArrayList phases = msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
+				response.getAxisOperation().setRemainingPhasesInFlow(phases);
+			}*/
 			
 			//Changed following from TransportUtils to SandeshaUtil since op. context is anavailable.
 			SOAPEnvelope resenvelope = null;



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org