You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/10/27 00:14:37 UTC

svn commit: r468170 - in /webservices/sandesha/trunk/java: ./ src/org/apache/sandesha2/handlers/ src/org/apache/sandesha2/msgprocessors/ src/org/apache/sandesha2/transport/

Author: chamikara
Date: Thu Oct 26 15:14:35 2006
New Revision: 468170

URL: http://svn.apache.org/viewvc?view=rev&rev=468170
Log:
Applied the patch from Matt on Sandesha2-32.


Modified:
    webservices/sandesha/trunk/java/project.properties
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java

Modified: webservices/sandesha/trunk/java/project.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/project.properties?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/project.properties (original)
+++ webservices/sandesha/trunk/java/project.properties Thu Oct 26 15:14:35 2006
@@ -10,7 +10,7 @@
 maven.repo.remote=http://www.ibiblio.org/maven/,\
                   http://people.apache.org/repository/,\
                   http://dist.codehaus.org/,\
-                  http://ws.zones.apache.org/~dims/maven/
+                  http://ws.zones.apache.org/repository/
 
 # XDOC PLUGIN
 ###############

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Thu Oct 26 15:14:35 2006
@@ -31,15 +31,12 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
 import org.apache.axis2.handlers.AbstractHandler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaListener;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
@@ -62,12 +59,14 @@
 	private static final long serialVersionUID = -7187928423123306156L;
 
 	private static final Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName());
-
-	public void invoke(MessageContext msgContext) throws AxisFault {
+	
+	public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: SandeshaGlobalInHandler::invoke, " + msgContext.getEnvelope().getHeader());
 
+		InvocationResponse returnValue = InvocationResponse.CONTINUE;
+		
 		ConfigurationContext configContext = msgContext.getConfigurationContext();
 		if (configContext == null)
 			throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
@@ -78,21 +77,21 @@
 
 		String reinjectedMessage = (String) msgContext.getProperty(Sandesha2Constants.REINJECTED_MESSAGE);
 		if (reinjectedMessage != null && Sandesha2Constants.VALUE_TRUE.equals(reinjectedMessage))
-			return; // Reinjected messages are not processed by Sandesha2 inflow
-					// handlers
+			return returnValue; // Reinjected messages are not processed by Sandesha2 inflow
+													// handlers
 
 		StorageManager storageManager = null;
 		try {
 			storageManager = SandeshaUtil
 					.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration());
 			if (storageManager == null) {
-				log.debug("Sandesha2 cannot proceed. The StorageManager is not available");
-				return;
+				log.debug("Sandesha2 cannot proceed. The StorageManager is not available " + returnValue);
+				return returnValue;
 			}
 		} catch (SandeshaException e1) {
 			// TODO make this a log
 			log.debug("Sandesha2 cannot proceed. Exception thrown when looking for the StorageManager", e1);
-			return;
+			return returnValue;
 		}
 
 		boolean withinTransaction = false;
@@ -138,6 +137,7 @@
 					faultManager.manageIncomingRMFault (axisFault, msgContext);
 					
 					msgContext.pause();
+					returnValue = InvocationResponse.SUSPEND;
 				}
 				
 			}
@@ -147,8 +147,8 @@
 			boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage(msgContext);
 			if (!isRMGlobalMessage) {
 				if (log.isDebugEnabled())
-					log.debug("Exit: SandeshaGlobalInHandler::invoke, !isRMGlobalMessage");
-				return;
+					log.debug("Exit: SandeshaGlobalInHandler::invoke, !isRMGlobalMessage " + returnValue);
+				return returnValue;
 			}
 
 			RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
@@ -156,10 +156,11 @@
 			// Dropping duplicates
 			boolean dropped = dropIfDuplicate(rmMessageContext, storageManager);
 			if (dropped) {
+				returnValue = InvocationResponse.SUSPEND; //the msg has been paused
 				processDroppedMessage(rmMessageContext, storageManager);
 				if (log.isDebugEnabled())
-					log.debug("Exit: SandeshaGlobalInHandler::invoke, dropped");
-				return;
+					log.debug("Exit: SandeshaGlobalInHandler::invoke, dropped " + returnValue);
+				return returnValue;
 			}
 
 			// Persisting the application messages
@@ -177,6 +178,7 @@
 		} catch (Exception e) {
 			// message should not be sent in a exception situation.
 			msgContext.pause();
+			returnValue = InvocationResponse.SUSPEND;
 
 			if (!withinTransaction) {
 				try {
@@ -205,7 +207,8 @@
 			}
 		}
 		if (log.isDebugEnabled())
-			log.debug("Exit: SandeshaGlobalInHandler::invoke");
+			log.debug("Exit: SandeshaGlobalInHandler::invoke " + returnValue);
+		return returnValue;
 	}
 
 	private boolean dropIfDuplicate(RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Thu Oct 26 15:14:35 2006
@@ -53,12 +53,14 @@
 	public String getName() {
 		return Sandesha2Constants.IN_HANDLER_NAME;
 	}
-
-	public void invoke(MessageContext msgCtx) throws AxisFault {
+	
+	public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
 		
 		if (log.isDebugEnabled())
 			log.debug("Enter: SandeshaInHandler::invoke, " + msgCtx.getEnvelope().getHeader());
 
+		InvocationResponse returnValue = InvocationResponse.CONTINUE;
+		
 		ConfigurationContext context = msgCtx.getConfigurationContext();
 		if (context == null) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
@@ -69,15 +71,15 @@
 		String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
 		if (null != DONE && "true".equals(DONE)) {
 			if (log.isDebugEnabled())
-				log.debug("Exit: SandeshaInHandler::invoke, Application processing done");
-			return;
+				log.debug("Exit: SandeshaInHandler::invoke, Application processing done " + returnValue);
+			return returnValue;
 		}
 
 		String reinjectedMessage = (String) msgCtx.getProperty(Sandesha2Constants.REINJECTED_MESSAGE);
 		if (reinjectedMessage != null && Sandesha2Constants.VALUE_TRUE.equals(reinjectedMessage)) {
 			if (log.isDebugEnabled())
-				log.debug("Exit: SandeshaInHandler::invoke, reinjectedMessage");
-			return; // Reinjected messages are not processed by Sandesha2 inflow
+				log.debug("Exit: SandeshaInHandler::invoke, reinjectedMessage " + returnValue);
+			return returnValue; // Reinjected messages are not processed by Sandesha2 inflow
 					// handlers
 		}
 		
@@ -100,11 +102,15 @@
 
 			// Process Ack headers in the message
 			AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
-			ackProcessor.processAckHeaders(msgCtx);
+			if(ackProcessor.processAckHeaders(msgCtx)){
+				returnValue = InvocationResponse.SUSPEND;
+			}
 
 			// Process Ack Request headers in the message
 			AckRequestedProcessor reqProcessor = new AckRequestedProcessor();
-			reqProcessor.processAckRequestedHeaders(msgCtx);
+			if(reqProcessor.processAckRequestedHeaders(msgCtx)){
+				returnValue = InvocationResponse.SUSPEND;
+			}
 
 			AxisService axisService = msgCtx.getAxisService();
 			if (axisService == null) {
@@ -125,6 +131,7 @@
 			//Ack messages will be paused
 			if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK) {
 				rmMsgCtx.pause();
+				returnValue = InvocationResponse.SUSPEND;
 			}
 			
 			// validating the message
@@ -132,14 +139,20 @@
 
 			MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
 
-			if (msgProcessor != null)
-				msgProcessor.processInMessage(rmMsgCtx);
+			if (msgProcessor != null){
+				if(msgProcessor.processInMessage(rmMsgCtx)){
+					//this paused the msg
+					returnValue = InvocationResponse.SUSPEND;
+				}
+			}
+				
 
 			
 		} catch (AxisFault e) {
 			// message should not be sent in a exception situation.
 			msgCtx.pause();
-
+			returnValue = InvocationResponse.SUSPEND;
+			
 			if (!withinTransaction) {
 				try {
 					transaction.rollback();
@@ -164,7 +177,8 @@
 			}
 		}
 		if (log.isDebugEnabled())
-			log.debug("Exit: SandeshaInHandler::invoke");
+			log.debug("Exit: SandeshaInHandler::invoke " + returnValue);
+		return returnValue;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Thu Oct 26 15:14:35 2006
@@ -1,188 +1,197 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *  
- */
-
-package org.apache.sandesha2.handlers;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContextFactory;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.handlers.AbstractHandler;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
-import org.apache.sandesha2.msgprocessors.MsgProcessor;
-import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.wsrm.Sequence;
-
-/**
- * This is invoked in the outFlow of an RM endpoint
- */
-
-public class SandeshaOutHandler extends AbstractHandler {
-
-	private static final long serialVersionUID = 8261092322051924103L;
-
-	private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
-
-	public void invoke(MessageContext msgCtx) throws AxisFault {
-		if (log.isDebugEnabled())
-			log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
-
-		ConfigurationContext context = msgCtx.getConfigurationContext();
-		if (context == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
-			log.debug(message);
-			throw new AxisFault(message);
-		}
-
-		AxisService axisService = msgCtx.getAxisService();
-		if (axisService == null) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
-			log.debug(message);
-			throw new AxisFault(message);
-		}
-
-		//see if this message is unreliable i.e. WSRM not requried
-		//look at the msg ctx first
-		{
-			String unreliable = (String) msgCtx.getProperty(SandeshaClientConstants.UNRELIABLE_MESSAGE);
-			if (null != unreliable && "true".equals(unreliable)) {
-				if (log.isDebugEnabled())
-					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message");
-				return;
-			}			
-		}
-		//look at the operation ctx
-		{
-			Parameter unreliable = msgCtx.getAxisOperation().getParameter(SandeshaClientConstants.UNRELIABLE_MESSAGE);
-			if (null != unreliable && "true".equals(unreliable.getValue())) {
-				if (log.isDebugEnabled())
-					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message");
-				return;
-			}				
-		}
-		// Also do not apply RM to fault messages
-		{
-			if(msgCtx.isProcessingFault()) {
-				if(log.isDebugEnabled())
-					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message");
-				return;
-			}
-		}
-		
-		//this will change the execution chain of this message to work correctly in retransmissions.
-		//For e.g. Phases like security will be removed to be called in each retransmission.
-		SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
-
-		String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
-		if (null != DONE && "true".equals(DONE)) {
-			if (log.isDebugEnabled())
-				log.debug("Exit: SandeshaOutHandler::invoke, Application processing done");
-			return;
-		}
-		
-		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
-
-		boolean withinTransaction = false;
-		String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
-		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
-			withinTransaction = true;
-		}
-
-		Transaction transaction = null;
-		if (!withinTransaction) {
-			transaction = storageManager.getTransaction();
-			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
-		}
-		boolean rolebacked = false;
-
-		try {
-			// getting rm message
-			RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
-
-			MsgProcessor msgProcessor = null;
-			int messageType = rmMsgCtx.getMessageType();
-			if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
-				MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
-						OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-
-                if (msgCtx.isServerSide()) { // for the server side
-                    RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
-                    Sequence sequencePart = (Sequence) reqRMMsgCtx
-                                    .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-                    if (sequencePart != null)
-                            msgProcessor = new ApplicationMsgProcessor();// a rm intended message
-                } else // if client side.
-                    msgProcessor = new ApplicationMsgProcessor();
-                
-			} else {
-				msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
-			}
-
-			if (msgProcessor != null)
-				msgProcessor.processOutMessage(rmMsgCtx);
-
-		} catch (Exception e) {
-			// message should not be sent in a exception situation.
-			msgCtx.pause();
-
-			// rolling back the transaction
-			if (!withinTransaction) {
-				try {
-					transaction.rollback();
-					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-					rolebacked = true;
-				} catch (Exception e1) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
-					log.debug(message, e);
-				}
-			}
-
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
-			throw new AxisFault(message, e);
-		} finally {
-			if (!withinTransaction && !rolebacked) {
-				try {
-					transaction.commit();
-					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
-				} catch (Exception e) {
-					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
-					log.debug(message, e);
-				}
-			}
-		}
-		if (log.isDebugEnabled())
-			log.debug("Exit: SandeshaOutHandler::invoke");
-	}
-
-	public String getName() {
-		return Sandesha2Constants.OUT_HANDLER_NAME;
-	}
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.handlers;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is invoked in the outFlow of an RM endpoint
+ */
+
+public class SandeshaOutHandler extends AbstractHandler {
+
+	private static final long serialVersionUID = 8261092322051924103L;
+
+	private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
+
+	public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
+
+		InvocationResponse returnValue = InvocationResponse.CONTINUE;
+		
+		ConfigurationContext context = msgCtx.getConfigurationContext();
+		if (context == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		AxisService axisService = msgCtx.getAxisService();
+		if (axisService == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		//see if this message is unreliable i.e. WSRM not requried
+		//look at the msg ctx first
+		{
+			String unreliable = (String) msgCtx.getProperty(SandeshaClientConstants.UNRELIABLE_MESSAGE);
+			if (null != unreliable && "true".equals(unreliable)) {
+				if (log.isDebugEnabled())
+					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
+				return returnValue;
+			}			
+		}
+		//look at the operation ctx
+		{
+			Parameter unreliable = msgCtx.getAxisOperation().getParameter(SandeshaClientConstants.UNRELIABLE_MESSAGE);
+			if (null != unreliable && "true".equals(unreliable.getValue())) {
+				if (log.isDebugEnabled())
+					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
+				return returnValue;
+			}				
+		}
+		// Also do not apply RM to fault messages
+		{
+			if(msgCtx.isProcessingFault()) {
+				if(log.isDebugEnabled())
+					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message " + returnValue);
+				return returnValue ;
+			}
+		}
+		
+		//this will change the execution chain of this message to work correctly in retransmissions.
+		//For e.g. Phases like security will be removed to be called in each retransmission.
+		SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+
+		String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+		if (null != DONE && "true".equals(DONE)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SandeshaOutHandler::invoke, Application processing done " + returnValue);
+			return returnValue;
+		}
+		
+		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+		boolean withinTransaction = false;
+		String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+		if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+			withinTransaction = true;
+		}
+
+		Transaction transaction = null;
+		if (!withinTransaction) {
+			transaction = storageManager.getTransaction();
+			msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+		}
+		boolean rolebacked = false;
+
+		try {
+			// getting rm message
+			RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+			MsgProcessor msgProcessor = null;
+			int messageType = rmMsgCtx.getMessageType();
+			if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+				MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
+						OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+
+                if (msgCtx.isServerSide()) { // for the server side
+                    RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+                    Sequence sequencePart = (Sequence) reqRMMsgCtx
+                                    .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+                    if (sequencePart != null)
+                            msgProcessor = new ApplicationMsgProcessor();// a rm intended message
+                } else // if client side.
+                    msgProcessor = new ApplicationMsgProcessor();
+                
+			} else {
+				msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+			}
+
+			if (msgProcessor != null){
+				if(msgProcessor.processOutMessage(rmMsgCtx)){
+					//the msg was paused
+					returnValue = InvocationResponse.SUSPEND;
+				}
+			}
+				
+
+		} catch (Exception e) {
+			// message should not be sent in a exception situation.
+			msgCtx.pause();
+			returnValue = InvocationResponse.SUSPEND;
+
+			// rolling back the transaction
+			if (!withinTransaction) {
+				try {
+					transaction.rollback();
+					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+					rolebacked = true;
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+					log.debug(message, e);
+				}
+			}
+
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
+			throw new AxisFault(message, e);
+		} finally {
+			if (!withinTransaction && !rolebacked) {
+				try {
+					transaction.commit();
+					msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+					log.debug(message, e);
+				}
+			}
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
+		return returnValue;
+	}
+
+	public String getName() {
+		return Sandesha2Constants.OUT_HANDLER_NAME;
+	}
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Thu Oct 26 15:14:35 2006
@@ -53,7 +53,6 @@
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
 import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.RMMsgCreator;
 import org.apache.sandesha2.util.SOAPAbstractFactory;
@@ -69,13 +68,13 @@
 
 	private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
 
-	public void processAckRequestedHeaders(MessageContext message) throws AxisFault {
+	public boolean processAckRequestedHeaders(MessageContext message) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AckRequestedProcessor::processAckRequestHeaders");
 
 		SOAPEnvelope envelope = message.getEnvelope();
 		SOAPHeader header = envelope.getHeader();
-		
+		boolean msgCtxPaused = false;
 		if(header!=null)
 		{
 			for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
@@ -86,16 +85,29 @@
 					OMElement ack = (OMElement) acks.next();
 					AckRequested ackReq = new AckRequested(headerName.getNamespaceURI());
 					ackReq.fromOMElement(ack);
-					processAckRequestedHeader(message, ack, ackReq);
+					boolean paused = processAckRequestedHeader(message, ack, ackReq);
+					//if nto already paused we might be now
+					if(!msgCtxPaused){
+						msgCtxPaused = paused;
+					}
 				}
 			}			
 		}
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: AckRequestedProcessor::processAckRequestHeaders");
+			log.debug("Exit: AckRequestedProcessor::processAckRequestHeaders " + msgCtxPaused);
+		return msgCtxPaused;
 	}
 
-	public void processAckRequestedHeader(MessageContext msgContext, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
+	/**
+	 * 
+	 * @param msgContext
+	 * @param soapHeader
+	 * @param ackRequested
+	 * @return true if the msg context was paused
+	 * @throws AxisFault
+	 */
+	public boolean processAckRequestedHeader(MessageContext msgContext, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
 
@@ -295,8 +307,10 @@
 			msgContext.pause();
 
 			if (log.isDebugEnabled())
-				log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader");
+				log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader " + Boolean.TRUE);
+			return true;
 		}
+		return false;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Thu Oct 26 15:14:35 2006
@@ -29,7 +29,6 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.engine.AxisEngine;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.RMMsgContext;
@@ -40,10 +39,8 @@
 import org.apache.sandesha2.security.SecurityManager;
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
 import org.apache.sandesha2.storage.beans.SequencePropertyBean;
 import org.apache.sandesha2.util.AcknowledgementManager;
@@ -65,13 +62,18 @@
 
 	private static final Log log = LogFactory.getLog(AcknowledgementProcessor.class);
 
-	public void processAckHeaders(MessageContext message) throws AxisFault {
+	/**
+	 * @param message
+	 * @return true if the msg context was paused
+	 * @throws AxisFault
+	 */
+	public boolean processAckHeaders(MessageContext message) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementProcessor::processAckHeaders");
 
 		SOAPEnvelope envelope = message.getEnvelope();
 		SOAPHeader header = envelope.getHeader();
-		
+		boolean returnValue = false;
 		if(header!=null)
 		{
 			for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
@@ -82,21 +84,35 @@
 					OMElement ack = (OMElement) acks.next();
 					SequenceAcknowledgement seqAck = new SequenceAcknowledgement(headerName.getNamespaceURI());
 					seqAck.fromOMElement(ack);
-					processAckHeader(message, ack, seqAck);
+					boolean ackPaused = processAckHeader(message, ack, seqAck);
+					//if not already paused we might be now
+					if(!returnValue){
+						returnValue = ackPaused;
+					}
 				}
 			}			
 		}
 
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementProcessor::processAckHeaders");
+			log.debug("Exit: AcknowledgementProcessor::processAckHeaders " + returnValue);
+		return returnValue;
 	}
 	
-	private void processAckHeader(MessageContext msgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
+	/**
+	 * @param msgCtx
+	 * @param soapHeader
+	 * @param sequenceAck
+	 * @return true if the msg context was paused
+	 * @throws AxisFault
+	 */
+	private boolean processAckHeader(MessageContext msgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
 		throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: AcknowledgementProcessor::processAckHeader " + soapHeader);
 		
+		boolean returnValue = false;
+		
 		// TODO: Note that this RMMessageContext is not really any use - but we need to create it
 		// so that it can be passed to the fault handling chain. It's really no more than a
 		// container for the correct addressing and RM spec levels, so we'd be better off passing
@@ -247,11 +263,13 @@
 
 		String action = msgCtx.getOptions().getAction();
 		if (action!=null && action.equals(SpecSpecificConstants.getAckRequestAction(rmMsgCtx.getRMSpecVersion()))) {
+			returnValue = true;
 			rmMsgCtx.pause();
 		}
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: AcknowledgementProcessor::processAckHeader");
+			log.debug("Exit: AcknowledgementProcessor::processAckHeader " + returnValue);
+		return returnValue;
 	}
 
 	private SenderBean getRetransmitterEntry(Collection collection, long msgNo) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -86,10 +86,13 @@
 
 	private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class);
 
-	public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processInMessage");
 
+		boolean msgCtxPaused = false;
+		
 		// Processing the application message.
 		MessageContext msgCtx = rmMsgCtx.getMessageContext();
 		if (msgCtx == null) {
@@ -100,7 +103,7 @@
 
 		if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
 				&& rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
-			return;
+			return msgCtxPaused;
 		}
 
 		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
@@ -232,6 +235,7 @@
 			// this is a duplicate message and the invocation type is
 			// EXACTLY_ONCE.
 			rmMsgCtx.pause();
+			msgCtxPaused = true;
 		}
 
 		if (!msgNoPresentInList)
@@ -309,6 +313,7 @@
 
 			// pause the message
 			rmMsgCtx.pause();
+			msgCtxPaused = true;
 
 			// Starting the invoker if stopped.
 			SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
@@ -319,7 +324,8 @@
 		sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: ApplicationMsgProcessor::processInMessage");
+			log.debug("Exit: ApplicationMsgProcessor::processInMessage " + msgCtxPaused);
+		return msgCtxPaused;
 	}
 
 	// TODO convert following from INT to LONG
@@ -461,7 +467,7 @@
 			log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded");
 	}
 
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
 
@@ -866,7 +872,8 @@
 		msgContext.pause();
 		
 		if (log.isDebugEnabled())
-			log.debug("Exit: ApplicationMsgProcessor::processOutMessage");
+			log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+		return true;
 	}
 
 	private void addCreateSequenceMessage(RMMsgContext applicationRMMsg, String sequencePropertyKey, String internalSequenceId, EndpointReference acksTo,

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Thu Oct 26 15:14:35 2006
@@ -1,168 +1,170 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *  
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.Iterator;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.util.Utils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.SOAPAbstractFactory;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-
-/**
- * Responsible for processing an incoming Close Sequence message. (As introduced
- * by the WSRM 1.1 specification)
- */
-
-public class CloseSequenceProcessor implements MsgProcessor {
-
-	private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
-
-	public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
-		if (log.isDebugEnabled())
-			log.debug("Enter: CloseSequenceProcessor::processInMessage");
-
-		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
-		CloseSequence closeSequence = (CloseSequence) rmMsgCtx
-				.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
-
-		MessageContext msgCtx = rmMsgCtx.getMessageContext();
-
-		String sequenceId = closeSequence.getIdentifier().getIdentifier();
-		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-		
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
-				.getAxisConfiguration());
-		SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
-		
-		// Check that the sender of this CloseSequence holds the correct token
-		SequencePropertyBean tokenBean = sequencePropMgr.retrieve(sequenceId, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
-		if(tokenBean != null) {
-			SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
-			OMElement body = msgCtx.getEnvelope().getBody();
-			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
-			secManager.checkProofOfPossession(token, body, msgCtx);
-		}
-
-		FaultManager faultManager = new FaultManager();
-		SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
-		if (fault != null) {
-			throw fault;
-		}
-
-		SequencePropertyBean sequenceClosedBean = new SequencePropertyBean();
-		sequenceClosedBean.setSequencePropertyKey(sequencePropertyKey);
-		sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED);
-		sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
-
-		sequencePropMgr.insert(sequenceClosedBean);
-
-		RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
-
-		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-
-		String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
-		ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
-
-		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
-				.getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
-
-		// Setting new envelope
-		SOAPEnvelope envelope = factory.getDefaultEnvelope();
-		try {
-			ackMsgCtx.setEnvelope(envelope);
-		} catch (AxisFault e3) {
-			throw new SandeshaException(e3.getMessage());
-		}
-
-		// adding the ack part to the envelope.
-		Iterator sequenceAckIter = ackRMMsgCtx
-				.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-
-		MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
-
-		MessageContext closeSequenceResponseMsg = null;
-
-		try {
-			closeSequenceResponseMsg = Utils.createOutMessageContext(closeSequenceMsg);
-		} catch (AxisFault e1) {
-			throw new SandeshaException(e1);
-		}
-
-		RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg,
-				storageManager);
-
-		while (sequenceAckIter.hasNext()) {
-			SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next();
-			closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
-					sequenceAcknowledgement);
-		}
-		
-		closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
-		closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
-		closeSequenceResponseMsg.setResponseWritten(true);
-
-		closeSeqResponseRMMsg.addSOAPEnvelope();
-
-		AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
-
-		try {
-			engine.send(closeSequenceResponseMsg);
-		} catch (AxisFault e) {
-			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse,
-					sequenceId, e.toString());
-			throw new SandeshaException(message, e);
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: CloseSequenceProcessor::processInMessage");
-	}
-
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
-		if (log.isDebugEnabled()) {
-			log.debug("Enter: CloseSequenceProcessor::processOutMessage");
-			log.debug("Exit: CloseSequenceProcessor::processOutMessage");
-		}
-
-	}
-
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing an incoming Close Sequence message. (As introduced
+ * by the WSRM 1.1 specification)
+ */
+
+public class CloseSequenceProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
+
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: CloseSequenceProcessor::processInMessage");
+
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		CloseSequence closeSequence = (CloseSequence) rmMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+		String sequenceId = closeSequence.getIdentifier().getIdentifier();
+		String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+		SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
+		
+		// Check that the sender of this CloseSequence holds the correct token
+		SequencePropertyBean tokenBean = sequencePropMgr.retrieve(sequenceId, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+		if(tokenBean != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+			OMElement body = msgCtx.getEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+			secManager.checkProofOfPossession(token, body, msgCtx);
+		}
+
+		FaultManager faultManager = new FaultManager();
+		SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
+		if (fault != null) {
+			throw fault;
+		}
+
+		SequencePropertyBean sequenceClosedBean = new SequencePropertyBean();
+		sequenceClosedBean.setSequencePropertyKey(sequencePropertyKey);
+		sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED);
+		sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
+
+		sequencePropMgr.insert(sequenceClosedBean);
+
+		RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
+
+		MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+
+		String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
+		ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+				.getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
+
+		// Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+		try {
+			ackMsgCtx.setEnvelope(envelope);
+		} catch (AxisFault e3) {
+			throw new SandeshaException(e3.getMessage());
+		}
+
+		// adding the ack part to the envelope.
+		Iterator sequenceAckIter = ackRMMsgCtx
+				.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+		MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
+
+		MessageContext closeSequenceResponseMsg = null;
+
+		try {
+			closeSequenceResponseMsg = Utils.createOutMessageContext(closeSequenceMsg);
+		} catch (AxisFault e1) {
+			throw new SandeshaException(e1);
+		}
+
+		RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg,
+				storageManager);
+
+		while (sequenceAckIter.hasNext()) {
+			SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next();
+			closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+					sequenceAcknowledgement);
+		}
+		
+		closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+		closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		closeSequenceResponseMsg.setResponseWritten(true);
+
+		closeSeqResponseRMMsg.addSOAPEnvelope();
+
+		AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
+
+		try {
+			engine.send(closeSequenceResponseMsg);
+		} catch (AxisFault e) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse,
+					sequenceId, e.toString());
+			throw new SandeshaException(message, e);
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CloseSequenceProcessor::processInMessage " + Boolean.FALSE);
+		return false;
+	}
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+		if (log.isDebugEnabled()) {
+			log.debug("Enter: CloseSequenceProcessor::processOutMessage");
+			log.debug("Exit: CloseSequenceProcessor::processOutMessage " + Boolean.FALSE);
+		}
+		return false;
+
+	}
+
+}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -66,7 +66,7 @@
 
 	private static final Log log = LogFactory.getLog(CreateSeqMsgProcessor.class);
 
-	public void processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
+	public boolean processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: CreateSeqMsgProcessor::processInMessage");
 
@@ -264,7 +264,8 @@
 		createSeqRMMsg.pause();
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: CreateSeqMsgProcessor::processInMessage");
+			log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
 	}
 
 	private boolean offerAccepted(String sequenceId, ConfigurationContext configCtx, RMMsgContext createSeqRMMsg,
@@ -300,7 +301,7 @@
 		return true;
 	}
 
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 		if (log.isDebugEnabled())
 			log.debug("Enter: CreateSeqMsgProcessor::processOutMessage");
 
@@ -316,7 +317,8 @@
 			}
 		}
 		if (log.isDebugEnabled())
-			log.debug("Exit: CreateSeqMsgProcessor::processOutMessage");
+			log.debug("Exit: CreateSeqMsgProcessor::processOutMessage " + Boolean.FALSE);
+		return false;
 	}
 	
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -61,7 +61,7 @@
 
 	private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
 
-	public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+	public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
@@ -307,14 +307,16 @@
 		createSeqResponseRMMsgCtx.pause();
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
+			log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
 	}
 
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 		if (log.isDebugEnabled()) {
 			log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
-			log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
+			log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
 		}
+		return false;
 
 	}
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Thu Oct 26 15:14:35 2006
@@ -38,7 +38,7 @@
 	 * A message is selected by the set of SenderBeans that are waiting to be sent.
 	 * This is processed using a SenderWorker. 
 	 */
-	public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
 		
 		MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
 		Address address = makeConnection.getAddress();
@@ -97,7 +97,7 @@
 		}
 
 		if (senderBean==null) 
-			return;
+			return false;
 			
 		TransportOutDescription transportOut = rmMsgCtx.getMessageContext().getTransportOut();
 		if (transportOut==null) {
@@ -128,6 +128,7 @@
 		worker.setTransportOut(rmMsgCtx.getMessageContext().getTransportOut());
 		
 		worker.run();
+		return false;
 	}
 	
 	private void addMessagePendingHeader (RMMsgContext returnMessage, boolean pending) throws SandeshaException {
@@ -139,7 +140,8 @@
 		
 	}
 
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		return false;
 	}
 
 	private void setTransportProperties (MessageContext returnMessage, RMMsgContext makeConnectionMessage) {

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -25,6 +25,19 @@
  */
 
 public interface MsgProcessor {
-	public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+	
+	/**
+	 * @param rmMsgCtx
+	 * @return true if the msg context has been paused
+	 * @throws AxisFault
+	 */
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+	
+	/**
+	 * 
+	 * @param rmMsgCtx
+	 * @return true if the msg context has been paused
+	 * @throws AxisFault
+	 */
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -70,7 +70,7 @@
 
 	private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
 
-	public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+	public boolean processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
@@ -177,7 +177,8 @@
 		terminateSeqMsg.pause();
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
+			log.debug("Exit: TerminateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
 	}
 
 	private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
@@ -327,7 +328,7 @@
 
 	}
 
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
@@ -393,7 +394,7 @@
 		if (terminated != null && "true".equals(terminated)) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
 			log.debug(message);
-			return;
+			return false;
 		}
 
 		TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
@@ -472,7 +473,8 @@
 		SandeshaUtil.executeAndStore(rmMsgCtx, key);
 
 		if (log.isDebugEnabled())
-			log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
+			log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage " + Boolean.FALSE);
+		return false;
 	}
 
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -42,7 +42,7 @@
 
 	private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
 	
-	public void processInMessage(RMMsgContext terminateResRMMsg)
+	public boolean processInMessage(RMMsgContext terminateResRMMsg)
 			throws AxisFault { 
 		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
 		
@@ -79,11 +79,13 @@
 		// Stop this message travelling further through the Axis runtime
 		terminateResRMMsg.pause();
 
-		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage");
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+		return true;
   }
 
-	public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
 		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
-		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage");
+		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
+		return false;
 	}
 }

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java Thu Oct 26 15:14:35 2006
@@ -46,7 +46,7 @@
 
 	}
 
-	public void invoke(MessageContext msgContext) throws AxisFault {
+	public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
 		
 		if (log.isDebugEnabled())
 			log.debug("Enter: Sandesha2TransportSender::invoke, " + msgContext.getEnvelope().getHeader());
@@ -77,7 +77,7 @@
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: Sandesha2TransportSender::invoke");
-
+		return InvocationResponse.CONTINUE;
 	}
 
 	//Below methods are not used



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org