You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC

svn commit: r531400 [3/18] - in /webservices/sandesha/trunk/java/modules: client/ core/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/sandesha2/ core/src/main/java/org/...

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,187 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.handlers;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.AckRequestedProcessor;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * This is invoked in the outFlow of an RM endpoint
+ */
+
+public class SandeshaOutHandler extends AbstractHandler {
+
+	private static final long serialVersionUID = 8261092322051924103L;
+
+	private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
+
+	public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
+
+		InvocationResponse returnValue = InvocationResponse.CONTINUE;
+		
+		ConfigurationContext context = msgCtx.getConfigurationContext();
+		if (context == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		AxisService axisService = msgCtx.getAxisService();
+		if (axisService == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
+			log.debug(message);
+			throw new AxisFault(message);
+		}
+
+		//see if this message is unreliable i.e. WSRM not requried
+		if(SandeshaUtil.isMessageUnreliable(msgCtx)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
+			return returnValue;
+		}
+
+		// Also do not apply RM to fault messages
+		{
+			if(msgCtx.isProcessingFault()) {
+				if(log.isDebugEnabled())
+					log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message " + returnValue);
+				return returnValue ;
+			}
+		}
+		
+		//this will change the execution chain of this message to work correctly in retransmissions.
+		//For e.g. Phases like security will be removed to be called in each retransmission.
+		SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+
+		String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+		if (null != DONE && "true".equals(DONE)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: SandeshaOutHandler::invoke, Application processing done " + returnValue);
+			return returnValue;
+		}
+		
+		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+		Transaction transaction = null;
+
+		try {
+			transaction = storageManager.getTransaction();
+			
+			// getting rm message
+			RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+			MsgProcessor msgProcessor = null;
+			int messageType = rmMsgCtx.getMessageType();
+			if(log.isDebugEnabled()) log.debug("Message Type: " + messageType);
+			if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+                if (msgCtx.isServerSide()) {
+                	String inboundSequence = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+                	Long   msgNum = (Long) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER);
+                	
+                    if (inboundSequence != null && msgNum != null) {
+            			msgProcessor = new ApplicationMsgProcessor(inboundSequence, msgNum.longValue());
+                    }
+                } else // if client side.
+                    msgProcessor = new ApplicationMsgProcessor();
+                
+			} else {
+				msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+			}
+
+			if (msgProcessor != null){
+				if(msgProcessor.processOutMessage(rmMsgCtx)){
+					//the msg was paused
+					returnValue = InvocationResponse.SUSPEND;
+				}
+			} else if (messageType==Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+				AckRequestedProcessor ackRequestedProcessor = new AckRequestedProcessor ();
+				if(ackRequestedProcessor.processOutgoingAckRequestMessage (rmMsgCtx)){
+					//the msg was paused
+					returnValue = InvocationResponse.SUSPEND;
+				}
+			}
+				
+			//we need the incoming thread to wait when suspending.
+			//Hence adding the boolean property.
+			//Should be done only to the server side
+			OperationContext opCtx = msgCtx.getOperationContext();
+			if(msgCtx.isServerSide() && opCtx != null && returnValue == InvocationResponse.SUSPEND) {
+				if(log.isDebugEnabled()) log.debug("Setting HOLD_RESPONSE property");
+				opCtx.setProperty(RequestResponseTransport.HOLD_RESPONSE, Boolean.TRUE);
+			}
+
+		} catch (Exception e) {
+			// message should not be sent in a exception situation.
+			msgCtx.pause();
+			returnValue = InvocationResponse.SUSPEND;
+
+			// rolling back the transaction
+			if (transaction != null) {
+				try {
+					transaction.rollback();
+					transaction = null;
+				} catch (Exception e1) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+					log.debug(message, e);
+				}
+			}
+
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
+			throw new AxisFault(message, e);
+		} finally {
+			if (transaction != null) {
+				try {
+					transaction.commit();
+				} catch (Exception e) {
+					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+					log.debug(message, e);
+				}
+			}
+		}
+		if (log.isDebugEnabled())
+			log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
+		return returnValue;
+	}
+
+	public String getName() {
+		return Sandesha2Constants.OUT_HANDLER_NAME;
+	}
+	
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.i18n;
+
+import java.util.Locale;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+
+import org.apache.axis2.i18n.MessageBundle;
+import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.i18n.ProjectResourceBundle;
+
+public class SandeshaMessageHelper {
+  public static final String projectName = "org.apache.sandesha2".intern();
+  public static final String resourceName = "resource".intern();
+  public static final Locale locale = null;
+  public static final String msgBundleKey = projectName; 
+
+  public static final String rootPackageName = "org.apache.sandesha2.i18n".intern();
+
+  public static final ResourceBundle rootBundle =
+          ProjectResourceBundle.getBundle(projectName,
+                  rootPackageName,
+                  resourceName,
+                  locale,
+                  SandeshaMessageHelper.class.getClassLoader(),
+                  null);
+  
+  public static void innit(){
+		MessageBundle bundle = new MessageBundle(
+				projectName,
+        rootPackageName,
+        resourceName,
+        locale,
+        SandeshaMessageHelper.class.getClassLoader(),
+        rootBundle);
+		
+		Messages.addMessageBundle(msgBundleKey, bundle);
+  }
+  
+  
+  /**
+   * Get a message from resource.properties from the package of the given object.
+   *
+   * @param key The resource key
+   * @return The formatted message
+   */
+  public static String getMessage(String key)
+          throws MissingResourceException{
+  	try{
+  		return Messages.getMessageFromBundle(msgBundleKey, key);
+  	}
+  	catch(MissingResourceException e){
+  		throw e;
+  	}
+  	catch(Exception e){
+  		return null;
+  	}
+  }
+  
+  /**
+   * Get a message from resource.properties from the package of the given object.
+   *
+   * @param key  The resource key
+   * @param arg0 The argument to place in variable {0}
+   * @return The formatted message
+   */
+  public static String getMessage(String key, String arg0)
+          throws MissingResourceException{
+  	try{
+  		return Messages.getMessageFromBundle(msgBundleKey, key, arg0);
+  	}
+  	catch(MissingResourceException e){
+  		throw e;
+  	}
+  	catch(Exception e){
+  		return null;
+  	}
+  }
+  
+
+  /**
+   * Get a message from resource.properties from the package of the given object.
+   *
+   * @param key  The resource key
+   * @param arg0 The argument to place in variable {0}
+   * @param arg1 The argument to place in variable {1}
+   * @return The formatted message
+   */
+  public static String getMessage(String key, String arg0, String arg1)
+          throws MissingResourceException{
+  	try{
+  		return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1);
+  	}
+  	catch(MissingResourceException e){
+  		throw e;
+  	}
+  	catch(Exception e){
+  		return null;
+  	}
+  }
+  
+  /**
+   * Get a message from resource.properties from the package of the given object.
+   *
+   * @param key  The resource key
+   * @param arg0 The argument to place in variable {0}
+   * @param arg1 The argument to place in variable {1}
+   * @param arg2 The argument to place in variable {2}
+   * @return The formatted message
+   */
+  public static String getMessage(String key, String arg0, String arg1, String arg2)
+          throws MissingResourceException{
+  	try{
+  		return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1, arg2);
+  	}
+  	catch(MissingResourceException e){
+  		throw e;
+  	}
+  	catch(Exception e){
+  		return null;
+  	}
+  }
+  
+  /**
+   * Get a message from resource.properties from the package of the given object.
+   *
+   * @param key  The resource key
+   * @param arg0 The argument to place in variable {0}
+   * @param arg1 The argument to place in variable {1}
+   * @param arg2 The argument to place in variable {2}
+   * @param arg3 The argument to place in variable {3}
+   * @return The formatted message
+   */
+  public static String getMessage(String key, String arg0, String arg1, String arg2, String arg3)
+          throws MissingResourceException{
+  	try{
+  		return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1, arg2, arg3);
+  	}
+  	catch(MissingResourceException e){
+  		throw e;
+  	}
+  	catch(Exception e){
+  		return null;
+  	}
+  }
+  
+  /**
+   * Get a message from resource.properties from the package of the given object.
+   *
+   * @param key  The resource key
+   * @param arg0 The argument to place in variable {0}
+   * @param arg1 The argument to place in variable {1}
+   * @param arg2 The argument to place in variable {2}
+   * @param arg3 The argument to place in variable {3}
+   * @param arg4 The argument to place in variable {4}
+   * @return The formatted message
+   */
+  public static String getMessage(String key, String arg0, String arg1, String arg2, String arg3, String arg4)
+          throws MissingResourceException{
+  	try{
+  		return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1, arg2, arg3, arg4);
+  	}
+  	catch(MissingResourceException e){
+  		throw e;
+  	}
+  	catch(Exception e){
+  		return null;
+  	}
+  }
+
+  
+  
+
+}
+

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.i18n;
+
+public class SandeshaMessageKeys {
+
+
+	public static final String cannotInitInMemoryStorageManager="cannotInitInMemoryStorageManager";
+	public static final String cannotInitPersistentStorageManager="cannotInitPersistentStorageManager";
+	public static final String cannotProceedDueToStorageManager="cannotProceedDueToStorageManager";
+	public static final String cannotGetStorageKey="cannotGetStorageKey";
+	public static final String cannotGetStorageManager="cannotGetStorageManager";
+	public static final String storageManagerMustImplement="storageManagerMustImplement";
+	public static final String cannotInitSecurityManager="cannotInitSecurityManager";
+	public static final String securityManagerMustImplement="securityManagerMustImplement";
+	public static final String cannotFindModulePolicies="cannotFindModulePolicies";
+	public static final String cannotPauseThread = "cannotPauseThread";
+
+	public static final String commitError="commitError";
+	public static final String rollbackError="rollbackError";
+	public static final String deadlock="deadlock";
+	public static final String noTransaction="noTransaction";
+	public static final String inMsgError="inMsgError";
+	public static final String outMsgError="outMsgError";
+	public static final String invokeMsgError="invokeMsgError";
+	public static final String sendMsgError="sendMsgError";
+	public static final String cannotSendMsgAsSequenceClosed="cannotSendMsgAsSequenceClosed";
+	public static final String cannotSendMsgAsSequenceTerminated="cannotSendMsgAsSequenceTerminated";
+	public static final String cannotSendMsgAsSequenceTimedout="cannotSendMsgAsSequenceTimedout";
+	public static final String noValidSyncResponse="noValidSyncResponse";
+	public static final String classLoaderNotFound="classLoaderNotFound";
+
+	public static final String defaultPropertyBeanNotSet="defaultPropertyBeanNotSet";
+	public static final String propertyBeanNotSet="propertyBeanNotSet";
+	public static final String optionsObjectNotSet="optionsObjectNotSet";
+	public static final String serviceContextNotSet="serviceContextNotSet";
+	public static final String sequenceIdBeanNotSet="sequenceIdBeanNotSet";
+	public static final String configContextNotSet="configContextNotSet";
+	public static final String soapEnvNotSet="soapEnvNotSet";
+	public static final String soapBodyNotPresent="soapBodyNotPresent";
+	public static final String unknownSoapVersion="unknownSoapVersion";
+	public static final String axisServiceIsNull="axisServiceIsNull";
+	public static final String msgContextNotSet="msgContextNotSet";
+	public static final String transportOutNotPresent="transportOutNotPresent";
+	public static final String couldNotFindOperation="couldNotFindOperation";
+	public static final String cannotChooseAcksTo="cannotChooseAcksTo";
+	public static final String cannotChooseSpecLevel="cannotChooseSpecLevel";
+
+	public static final String setAValidMsgNumber="setAValidMsgNumber";
+	public static final String cannotStartListenerForIncommingMsgs="cannotStartListenerForIncommingMsgs";
+	public static final String nonUniqueResult="nonUniqueResult";
+	public static final String invalidStringArray="invalidStringArray";
+	public static final String cannotCointinueSender="cannotCointinueSender";
+	public static final String sendHasUnavailableMsgEntry="sendHasUnavailableMsgEntry";
+	public static final String propertyInvalidValue="propertyInvalidValue";
+	public static final String invalidRange="invalidRange";
+	public static final String workAlreadyAssigned="workAlreadyAssigned";
+
+
+	public static final String rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
+	public static final String unknownWSAVersion="unknownWSAVersion";
+	public static final String emptyAckRequestSpecLevel="emptyAckRequestSpecLevel";
+	public static final String closeSequenceSpecLevel="closeSequenceSpecLevel";
+	public static final String unknownSpec="unknownSpec";
+	public static final String unknownRMNamespace="unknownRMNamespace";
+	public static final String unknownNamespace="unknownNamespace";
+	public static final String cannotDecideRMVersion="cannotDecideRMVersion";
+	public static final String specDoesNotSupportElement="specDoesNotSupportElement";
+		
+	public static final String couldNotSendTerminate="couldNotSendTerminate";
+	public static final String couldNotSendClose="couldNotSendClose";
+	public static final String couldNotSendTerminateResponse="couldNotSendTerminateResponse";
+	public static final String couldNotSendTerminateSeqNotFound="couldNotSendTerminateSeqNotFound";
+	public static final String cannotSendAckRequestException="cannotSendAckRequestException";
+	public static final String ackRequestMultipleParts="ackRequestMultipleParts";
+	public static final String noAckRequestPartFound="noAckRequestPartFound";
+	public static final String noSequenceEstablished="noSequenceEstablished";
+	public static final String invalidInternalSequenceID="invalidInternalSequenceID";
+	public static final String createSeqEntryNotFound="createSeqEntryNotFound";
+
+	public static final String toEPRNotValid="toEPRNotValid";
+	public static final String cannotFindSequence="cannotFindSequence";
+	public static final String msgNumberMustBeLargerThanZero="msgNumberMustBeLargerThanZero";
+	public static final String msgNumberLargerThanLastMsg="msgNumberLargerThanLastMsg";
+	public static final String msgNumberNotLargerThanLastMsg="msgNumberNotLargerThanLastMsg";
+	public static final String ackInvalidNotSent="ackInvalidNotSent";
+	public static final String cannotHaveFinalWithNack="cannotHaveFinalWithNack";
+	public static final String accptButNoSequenceOffered="accptButNoSequenceOffered";
+	public static final String relatesToNotAvailable="relatesToNotAvailable";
+	public static final String cannotDerriveAckInterval="cannotDerriveAckInterval";
+	public static final String cannotDerriveRetransInterval="cannotDerriveRetransInterval";
+	public static final String cannotDerriveInactivityTimeout="cannotDerriveInactivityTimeout";
+	public static final String noCreateSeqParts="noCreateSeqParts";
+	public static final String noAcceptPart="noAcceptPart";
+	public static final String noAcksToPartInCreateSequence="noAcksToPartInCreateSequence";
+	public static final String tempSeqIdNotSet="tempSeqIdNotSet";
+	public static final String ackRandDoesNotHaveCorrectValues="ackRandDoesNotHaveCorrectValues";
+	public static final String cannotSetAckRangeNullElement="cannotSetAckRangeNullElement";
+	public static final String acksToStrNotSet="acksToStrNotSet";
+	public static final String invalidSequenceID="invalidsequenceID";
+	public static final String cantSendMakeConnectionNoTransportOut="cantSendMakeConnectionNoTransportOut";
+	public static final String makeConnectionDisabled="makeConnectionDisabled";
+	
+	public static final String noCreateSeqResponse="noCreateSeqResponse";
+	public static final String noTerminateSeqPart="noTerminateSeqPart";
+	public static final String noNackInSeqAckPart="noNackInSeqAckPart";
+	public static final String nackDoesNotContainValidLongValue="nackDoesNotContainValidLongValue";
+	public static final String seqAckPartIsNull="seqAckPartIsNull";
+	public static final String noneNotAllowedNamespace="noneNotAllowedNamespace";
+	public static final String noneNotAllowedAckRangesPresent="noneNotAllowedAckRangesPresent";
+	public static final String noneNotAllowedNackPresent="noneNotAllowedNackPresent";
+	public static final String finalNotAllowedNamespace="finalNotAllowedNamespace";
+	public static final String noFaultCodeNullElement="noFaultCodeNullElement";
+	public static final String noSeqFaultInElement="noSeqFaultInElement";
+	public static final String noSeqOfferInElement="noSeqOfferInElement";
+	public static final String noTerminateSeqInElement="noTerminateSeqInElement";
+	public static final String noTerminateSeqResponseInElement="noTerminateSeqResponseInElement";
+	public static final String noAcceptPartInElement="noAcceptPartInElement";
+	public static final String noUpperOrLowerAttributesInElement="noUpperOrLowerAttributesInElement";
+	public static final String noSequencePartInElement="noSequencePartInElement";
+	public static final String noLastMessagePartInElement="noLastMessagePartInElement";
+	public static final String noFinalPartInElement="noFinalPartInElement"; 
+	public static final String noNonePartInElement="noNonePartInElement";
+	public static final String noCloseSequencePartInElement="noCloseSequencePartInElement";
+	public static final String noMessageNumberPartInElement="noMessageNumberPartInElement";
+	public static final String noCloseSeqResponsePartInElement="noCloseSeqResponsePartInElement";
+	public static final String noExpiresPartInElement="noExpiresPartInElement";
+	public static final String noCreateSeqPartInElement="noCreateSeqPartInElement";
+	public static final String noCreateSeqResponsePartInElement="noCreateSeqResponsePartInElement";
+	public static final String noFaultCodePart="noFaultCodePart";
+	public static final String cannotFindAddressElement="cannotFindAddressElement";
+	public static final String cannotFindAddressText="cannotFindAddressText";
+	public static final String nullPassedElement="nullPassedElement";
+	public static final String seqPartIsNull="seqPartIsNull";
+	public static final String incomingSequenceNotValidID="incomingSequenceNotValidID";
+
+	public static final String seqFaultCannotBeExtractedToNonHeader="seqFaultCannotBeExtractedToNonHeader";
+	public static final String seqElementCannotBeAddedToNonHeader="seqElementCannotBeAddedToNonHeader";
+	public static final String ackRequestedCannotBeAddedToNonHeader="ackRequestedCannotBeAddedToNonHeader";
+	public static final String terminateSeqCannotBeAddedToNonBody="terminateSeqCannotBeAddedToNonBody";
+	public static final String terminateSeqResponseCannotBeAddedToNonBody="terminateSeqResponseCannotBeAddedToNonBody";
+	public static final String closeSeqCannotBeAddedToNonBody="closeSeqCannotBeAddedToNonBody";
+	public static final String closeSeqResponseCannotBeAddedToNonBody="closeSeqResponseCannotBeAddedToNonBody";
+	public static final String createSeqCannotBeAddedToNonBody="createSeqCannotBeAddedToNonBody";
+	public static final String createSeqResponseCannotBeAddedToNonBody="createSeqResponseCannotBeAddedToNonBody";
+	public static final String closeSeqPartNullID="closeSeqPartNullID";
+	public static final String invalidIdentifier="invalidIdentifier";
+	public static final String closeSeqResponsePartNullID="closeSeqResponsePartNullID";
+	public static final String ackRequestNullID="ackRequestNullID";
+	public static final String createSeqNullAcksTo="createSeqNullAcksTo";
+	public static final String acceptNullAcksTo="acceptNullAcksTo";
+	public static final String noAcksToPart="noAcksToPart";
+	public static final String cannotProcessExpires="cannotProcessExpires";
+	public static final String noFaultCode="noFaultCode";
+
+	public static final String cannotSetAcksTo="cannotSetAcksTo";
+	public static final String cannotSetEndpoint="cannotSetEndpoint";
+	public static final String invalidMsgNumber="invalidMsgNumber";
+	public static final String addressNotValid="addressNotValid";
+
+	public static final String incommingSequenceReportNotFound="incommingSequenceReportNotFound";
+	public static final String cannotFindReportForGivenData="cannotFindReportForGivenData";
+
+	public static final String outSeqIDIsNull="outSeqIDIsNull";
+	public static final String requestMsgNotPresent="requestMsgNotPresent";
+	public static final String newSeqIdIsNull="newSeqIdIsNull";
+	public static final String terminateAddedPreviously="terminateAddedPreviously";
+	public static final String nullMsgId="nullMsgId";
+	public static final String failedToStoreMessage="failedToStoreMessage";
+	public static final String failedToLoadMessage="failedToLoadMessage";
+	public static final String entryNotPresentForUpdating="entryNotPresentForUpdating";
+	public static final String appMsgIsNull="appMsgIsNull";
+	public static final String cannotFindReqMsgFromOpContext="cannotFindReqMsgFromOpContext";
+
+	public static final String secureDummyNoProof="secureDummyNoProof";
+	public static final String secureDummyNoToken="secureDummyNoToken";
+	public static final String secureDummyNoSTR  ="secureDummyNoSTR";
+	
+	public static final String cannotFindTransportInDesc = "cannotFindTransportInDesc";
+	public static final String toEPRNotSet = "toEPRNotSet";
+	public static final String toBeanNotSet = "toBeanNotSet";
+	    
+    
+	public final static String errorRetrievingSecurityToken = "errorRetrievingSecurityToken";
+	public final static String proofOfPossessionNotVerified = "proofOfPossessionNotVerified";
+	public final static String noSecurityResults = "noSecurityResults";
+	public final static String noSecConvTokenInPolicy = "noSecConvTokenInPolicy";
+	    
+	public final static String elementMustForSpec = "elementMustForSpec";
+	public final static String couldNotSendCreateSeqResponse = "couldNotSendCreateSeqResponse";
+	public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
+	public final static String invokerNotFound="invokerNotFound";
+	    
+	public final static String couldNotSendCloseResponse="couldNotSendCloseResponse";
+	
+	public final static String couldNotLoadModulePolicies = "couldNotLoadModulePolicies";
+	public final static String modulePoliciesLoaded = "modulePoliciesLoaded";
+	
+	public final static String createSequenceRefused = "createSequenceRefused";
+	public final static String referencedMessageNotFound = "referencedMessageNotFound";
+	public final static String messageNumberRollover = "messageNumberRollover";
+	public final static String sequenceTerminatedFault = "sequenceTerminatedFault";
+	public static final String unknownSequenceFault="unknownSequenceFault";
+	public static final String invalidAckFault="invalidAckFault";
+	public static final String cannotAcceptMsgAsSequenceClosedFault="cannotAcceptMsgAsSequenceClosedFault"; 
+	
+	public final static String policyBeanNotFound = "policyBeanNotFound";
+	public final static String cloneDoesNotMatchToOriginal = "cloneDoesNotMatchToOriginal";
+	public final static String exceptionInFlowCompletion = "exceptionInFlowCompletion";
+	public final static String rmdBeanNotFound = "rmdBeanNotFound";
+	public final static String rmEnforceFailure = "rmEnforceFailure";
+	public final static String policyHasNotBeenSet = "policyHasNotBeenSet";
+	public final static String referenceMessageNotSetForSequence = "referenceMessageNotSetForSequence";
+	public final static String moduleNotSet = "moduleNotSet";
+	public final static String cannotSetPolicyBeanServiceNull = "cannotSetPolicyBeanServiceNull";
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,336 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.wsrm.AckRequested;
+
+/**
+ * Responsible for processing ack requested headers on incoming messages.
+ */
+
+public class AckRequestedProcessor extends WSRMMessageSender {
+
+	private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
+
+	public boolean processAckRequestedHeaders(RMMsgContext message) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AckRequestedProcessor::processAckRequestHeaders");
+
+		SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
+		SOAPHeader header = envelope.getHeader();
+		boolean msgCtxPaused = false;
+		if(header!=null)
+		{
+			for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
+				QName headerName = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED);
+				
+				Iterator acks = header.getChildrenWithName(headerName);
+				while(acks.hasNext()) {
+					OMElement ack = (OMElement) acks.next();
+					AckRequested ackReq = new AckRequested(headerName.getNamespaceURI());
+					ackReq.fromOMElement(ack);
+					boolean paused = processAckRequestedHeader(message, ack, ackReq);
+					//if nto already paused we might be now
+					if(!msgCtxPaused){
+						msgCtxPaused = paused;
+					}
+				}
+			}			
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AckRequestedProcessor::processAckRequestHeaders " + msgCtxPaused);
+		return msgCtxPaused;
+	}
+
+	/**
+	 * 
+	 * @param msgContext
+	 * @param soapHeader
+	 * @param ackRequested
+	 * @return true if the msg context was paused
+	 * @throws AxisFault
+	 */
+	public boolean processAckRequestedHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
+
+		String sequenceId = ackRequested.getIdentifier().getIdentifier();
+
+		MessageContext msgContext = rmMsgCtx.getMessageContext();
+		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+				configurationContext.getAxisConfiguration());
+		
+		// Check that the sender of this AckRequest holds the correct token
+		RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+
+		if(rmdBean != null && rmdBean.getSecurityTokenData() != null) {;
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(configurationContext);
+			SecurityToken token = secManager.recoverSecurityToken(rmdBean.getSecurityTokenData());
+			
+			secManager.checkProofOfPossession(token, soapHeader, msgContext);
+		}
+
+		// Check that the sequence requested exists
+		if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, Unknown sequence ");
+			return false;
+		}
+
+		// throwing a fault if the sequence is terminated
+		if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, rmdBean)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, Sequence terminated");
+			return false;
+		}
+
+		// Setting the ack depending on AcksTo.
+		EndpointReference acksTo = new EndpointReference(rmdBean.getAcksToEPR());
+		String acksToStr = acksTo.getAddress();
+
+		if (acksToStr == null)
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
+
+		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.ACK,
+				rmdBean.getRMVersion(),
+				msgContext.getAxisService());
+		MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
+
+		ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+		ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
+
+		ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+		SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(msgContext.getEnvelope()));
+
+		// Setting new envelope
+		SOAPEnvelope envelope = factory.getDefaultEnvelope();
+		try {
+			ackMsgCtx.setEnvelope(envelope);
+		} catch (AxisFault e3) {
+			throw new SandeshaException(e3.getMessage());
+		}
+
+		ackMsgCtx.setTo(acksTo);
+		ackMsgCtx.setReplyTo(msgContext.getTo());
+		RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean);
+		ackRMMsgCtx.getMessageContext().setServerSide(true);
+
+		if (acksTo.hasAnonymousAddress()) {
+
+			AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext().getConfigurationContext());
+
+			// setting CONTEXT_WRITTEN since acksto is anonymous
+			if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
+				// operation context will be null when doing in a GLOBAL
+				// handler.
+				
+				ServiceContext serviceCtx = msgContext.getServiceContext();
+				OperationContext opCtx =  ContextFactory.createOperationContext(ackOperation, serviceCtx);
+
+				rmMsgCtx.getMessageContext().setOperationContext(opCtx);
+			}
+
+			rmMsgCtx.getMessageContext().getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
+					Constants.VALUE_TRUE);
+
+			rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
+
+			try {
+				engine.send(ackRMMsgCtx.getMessageContext());
+			} catch (AxisFault e1) {
+				throw new SandeshaException(e1.getMessage());
+			}
+
+		} else {
+
+			SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+			String key = SandeshaUtil.getUUID();
+
+			// dumping to the storage will be done be Sandesha2 Transport Sender
+			// storageManager.storeMessageContext(key,ackMsgCtx);
+
+			SenderBean ackBean = new SenderBean();
+			ackBean.setMessageContextRefKey(key);
+			ackBean.setMessageID(ackMsgCtx.getMessageID());
+			ackBean.setReSend(false);
+			ackBean.setSequenceID(sequenceId);
+			
+			EndpointReference to = ackMsgCtx.getTo();
+			if (to!=null)
+				ackBean.setToAddress(to.getAddress());
+
+			// this will be set to true in the sender.
+			ackBean.setSend(true);
+
+			ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+			ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+			// the internalSequenceId value of the retransmitter Table for the
+			// messages related to an incoming
+			// sequence is the actual sequence ID
+
+			// operation is the lowest level, Sandesha2 can be engaged.
+			SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
+
+			long ackInterval = propertyBean.getAcknowledgementInterval();
+
+			// Ack will be sent as stand alone, only after the retransmitter
+			// interval.
+			long timeToSend = System.currentTimeMillis() + ackInterval;
+
+			// removing old acks.
+			SenderBean findBean = new SenderBean();
+			findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+			findBean.setSend(true);
+			findBean.setReSend(false);
+			Collection coll = retransmitterBeanMgr.find(findBean);
+			Iterator it = coll.iterator();
+
+			if (it.hasNext()) {
+				SenderBean oldAckBean = (SenderBean) it.next();
+				// If there is an old ack. This ack will be sent in the old timeToSend.
+				timeToSend = oldAckBean.getTimeToSend(); 
+				retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+			}
+
+			ackBean.setTimeToSend(timeToSend);
+
+			msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+			
+			// passing the message through sandesha2sender
+
+			SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+			// inserting the new ack.
+			retransmitterBeanMgr.insert(ackBean);
+
+			msgContext.pause();
+
+			if (log.isDebugEnabled())
+				log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader " + Boolean.TRUE);
+			return true;
+		}
+		return false;
+	}
+	
+	/**
+	 * This is used to capture AckRequest messages send by the SandeshaClient.
+	 * This will send that message using the Sandesha2 Sender.
+	 * 
+	 * @param rmMsgContext
+	 */
+	public boolean processOutgoingAckRequestMessage (RMMsgContext ackRequestRMMsg) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: AckRequestedProcessor::processOutgoingAckRequestMessage");
+
+		setupOutMessage(ackRequestRMMsg);
+
+		AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.ACK,
+				getRMVersion(),
+				getMsgContext().getAxisService());
+		getMsgContext().setAxisOperation(ackOperation);
+
+		ServiceContext serviceCtx = getMsgContext().getServiceContext();
+		OperationContext opcontext = ContextFactory.createOperationContext(ackOperation, serviceCtx);
+		opcontext.setParent(getMsgContext().getServiceContext());
+
+		getConfigurationContext().registerOperationContext(ackRequestRMMsg.getMessageId(), opcontext);
+		getMsgContext().setOperationContext(opcontext);
+		
+		Iterator iterator = ackRequestRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
+		
+		AckRequested ackRequested = null;
+		while (iterator.hasNext()) {
+			ackRequested = (AckRequested) iterator.next(); 
+		}
+		
+		if (iterator.hasNext()) {
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
+		}
+		
+		if (ackRequested==null) {
+			throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAckRequestPartFound));
+		}
+		
+		ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (getRMVersion()));
+		ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (getRMVersion()));
+
+		sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
+		
+		// Pause the message context
+		ackRequestRMMsg.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);
+		
+		return true;
+
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,257 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing acknowledgement headers on incoming messages.
+ */
+
+public class AcknowledgementProcessor {
+
+	private static final Log log = LogFactory.getLog(AcknowledgementProcessor.class);
+
+	/**
+	 * @param message
+	 * @throws AxisFault
+	 */
+	public void processAckHeaders(RMMsgContext message) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementProcessor::processAckHeaders");
+
+		SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
+		SOAPHeader header = envelope.getHeader();
+		if(header!=null)
+		{
+			for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
+				QName headerName = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
+				
+				Iterator acks = header.getChildrenWithName(headerName);
+				while(acks.hasNext()) {
+					OMElement ack = (OMElement) acks.next();
+					SequenceAcknowledgement seqAck = new SequenceAcknowledgement(headerName.getNamespaceURI());
+					seqAck.fromOMElement(ack);
+					processAckHeader(message, ack, seqAck);
+				}
+			}			
+		}
+
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementProcessor::processAckHeaders");
+	}
+	
+	/**
+	 * @param rmMsgCtx
+	 * @param soapHeader
+	 * @param sequenceAck
+	 * @throws AxisFault
+	 */
+	private void processAckHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
+		throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: AcknowledgementProcessor::processAckHeader " + soapHeader);
+		
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+		ConfigurationContext configCtx = msgCtx.getConfigurationContext();
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+
+		SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+
+		String outSequenceId = sequenceAck.getIdentifier().getIdentifier();
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
+
+		if (outSequenceId == null || "".equals(outSequenceId)) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+		if (FaultManager.checkForUnknownSequence(rmMsgCtx, outSequenceId, storageManager)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: AcknowledgementProcessor::processAckHeader, Unknown sequence");
+			return;
+		}
+		if (FaultManager.checkForSequenceTerminated(rmMsgCtx, outSequenceId, rmsBean)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: AcknowledgementProcessor::processAckHeader, Sequence terminated");
+			return;
+		}
+
+		// Check that the sender of this Ack holds the correct token
+		String internalSequenceId = rmsBean.getInternalSequenceID();
+		if(rmsBean.getSecurityTokenData() != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+			SecurityToken token = secManager.recoverSecurityToken(rmsBean.getSecurityTokenData());
+			
+			secManager.checkProofOfPossession(token, soapHeader, msgCtx);
+		}
+		
+		if(log.isDebugEnabled()) log.debug("Got Ack for RM Sequence: " + outSequenceId + ", internalSeqId: " + internalSequenceId);
+		Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges().iterator();
+
+		if (FaultManager.checkForUnknownSequence(rmMsgCtx, outSequenceId, storageManager)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: AcknowledgementProcessor::processAckHeader, Unknown sequence ");
+			return;
+		}
+		
+		if (FaultManager.checkForInvalidAcknowledgement(rmMsgCtx, sequenceAck, storageManager, rmsBean)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack range ");
+			return;
+		}
+		
+		String replyToAddress = rmsBean.getReplyToEPR();
+		EndpointReference replyTo = new EndpointReference (replyToAddress);
+		boolean anonReplyTo = replyTo.hasAnonymousAddress();
+		
+		String rmVersion = rmMsgCtx.getRMSpecVersion();
+		
+		// Compare the clientCompletedMessages with the range we just got, to work out if there
+		// is any new information in this ack message
+		RangeString completedMessages = rmsBean.getClientCompletedMessages();
+		long numberOfNewMessagesAcked = 0;
+
+		while(ackRangeIterator.hasNext()) {
+			AcknowledgementRange ackRange = (AcknowledgementRange) ackRangeIterator.next();
+			long lower = ackRange.getLowerValue();
+			long upper = ackRange.getUpperValue();
+			
+			// Quick check to see if the whole range is covered
+			if(!completedMessages.isRangeCompleted(new Range(lower, upper))) {
+				// We have new info, so take each message one at a time
+				for (long messageNo = lower; messageNo <= upper; messageNo++) {
+					if(!completedMessages.isMessageNumberInRanges(messageNo)) {
+						// We have a new message to consider
+						numberOfNewMessagesAcked++;
+						completedMessages.addRange(new Range(messageNo, messageNo));
+
+						SenderBean matcher = new SenderBean();
+						matcher.setSequenceID(outSequenceId);
+						matcher.setMessageNumber(messageNo);
+						
+						SenderBean retransmitterBean = retransmitterMgr.findUnique(matcher);
+						if (retransmitterBean != null) {
+							// Check we haven't got an Ack for a message that hasn't been sent yet !
+							if (retransmitterBean.getSentCount() == 0) {
+								FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
+										storageManager);
+								if (log.isDebugEnabled())
+									log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack");
+								return;
+							}
+							
+							String storageKey = retransmitterBean.getMessageContextRefKey();
+							
+							boolean syncResponseNeeded = false;
+							if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
+								MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
+								AxisOperation operation = applicationMessage.getAxisOperation();
+								if(operation!= null) {
+									int mep = operation.getAxisSpecifMEPConstant();
+									syncResponseNeeded = (mep == WSDLConstants.MEP_CONSTANT_OUT_IN);
+								}
+							}
+
+							if (!syncResponseNeeded) {
+								// removing the application message from the storage.
+								retransmitterMgr.delete(retransmitterBean.getMessageID());
+								storageManager.removeMessageContext(storageKey);
+							}
+						}
+					}
+				}
+			}
+		}
+
+		// updating the last activated time of the sequence.
+		rmsBean.setLastActivatedTime(System.currentTimeMillis());
+
+		//adding a MakeConnection for the response sequence if needed.
+		if (rmsBean.getOfferedSequence() != null) {
+
+			RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
+			RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
+			
+			if (rMDBean!=null && rMDBean.isPollingMode()) {
+				PollingManager manager = storageManager.getPollingManager();
+				if(manager != null) manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+			}
+		}
+
+		// We overwrite the previous client completed message ranges with the
+		// latest view, but only if it is an update i.e. contained a new
+		// ack range (which is because we do not previous acks arriving late 
+		// to break us)
+		if (numberOfNewMessagesAcked>0) {
+			rmsBean.setClientCompletedMessages(completedMessages);
+			long noOfMsgsAcked = rmsBean.getNumberOfMessagesAcked() + numberOfNewMessagesAcked;
+			rmsBean.setNumberOfMessagesAcked(noOfMsgsAcked);
+		}
+		
+		// Update the RMSBean
+		storageManager.getRMSBeanMgr().update(rmsBean);
+
+		// Try and terminate the sequence
+		if (!rmsBean.isAvoidAutoTermination()) 
+			TerminateManager.checkAndTerminate(rmMsgCtx.getConfigurationContext(), storageManager, rmsBean);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: AcknowledgementProcessor::processAckHeader");
+	}
+
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,529 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.SequenceOffer;
+
+/**
+ * Responsible for processing an incoming Application message.
+ */
+
+public class ApplicationMsgProcessor implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class);
+
+	private String inboundSequence = null;
+	private long   inboundMessageNumber;
+	
+	public ApplicationMsgProcessor() {
+		// Nothing to do
+	}
+	
+	public ApplicationMsgProcessor(String inboundSequenceId, long inboundMessageNumber) {
+		this.inboundSequence = inboundSequenceId;
+		this.inboundMessageNumber = inboundMessageNumber;
+	}
+	
+	public boolean processInMessage(RMMsgContext rmMsgCtx) {
+		if (log.isDebugEnabled()) {
+			log.debug("Enter: ApplicationMsgProcessor::processInMessage");
+			log.debug("Exit: ApplicationMsgProcessor::processInMessage");
+		}
+		return false;
+	}
+	
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
+
+		MessageContext msgContext = rmMsgCtx.getMessageContext();
+		ConfigurationContext configContext = msgContext.getConfigurationContext();
+
+		// setting the Fault callback
+		SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
+				SandeshaClientConstants.SANDESHA_LISTENER);
+		if (faultCallback != null) {
+			OperationContext operationContext = msgContext.getOperationContext();
+			if (operationContext != null) {
+				operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
+			}
+		}
+
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext
+				.getAxisConfiguration());
+
+		boolean serverSide = msgContext.isServerSide();
+
+		// setting message Id if null
+		if (msgContext.getMessageID() == null)
+			msgContext.setMessageID(SandeshaUtil.getUUID());
+
+		// find internal sequence id
+		String internalSequenceId = null;
+
+		String storageKey = SandeshaUtil.getUUID(); // the key which will be
+													// used to store this
+													// message.
+
+		/*
+		 * Internal sequence id is the one used to refer to the sequence (since
+		 * actual sequence id is not available when first msg arrives) server
+		 * side - a derivation of the sequenceId of the incoming sequence client
+		 * side - a derivation of wsaTo & SeequenceKey
+		 */
+
+		boolean lastMessage = false;
+		if (serverSide) {
+			if (inboundSequence == null || "".equals(inboundSequence)) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.incomingSequenceNotValidID, inboundSequence);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			internalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(inboundSequence);
+
+			// Deciding whether this is the last message. We assume it is if it relates to
+			// a message which arrived with the LastMessage flag on it.
+			RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);			
+			// Get the last in message
+			String lastRequestId = rmdBean.getLastInMessageId();
+			RelatesTo relatesTo = msgContext.getRelatesTo();
+			if(relatesTo != null && lastRequestId != null &&
+					lastRequestId.equals(relatesTo.getValue())) {
+				lastMessage = true;
+			}
+
+		} else {
+			// set the internal sequence id for the client side.
+			EndpointReference toEPR = msgContext.getTo();
+			if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
+				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+				log.debug(message);
+				throw new SandeshaException(message);
+			}
+
+			String to = toEPR.getAddress();
+			String sequenceKey = (String) msgContext.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+			internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+
+			String lastAppMessage = (String) msgContext.getProperty(SandeshaClientConstants.LAST_MESSAGE);
+			if (lastAppMessage != null && "true".equals(lastAppMessage))
+				lastMessage = true;
+		}
+		
+		if (internalSequenceId!=null)
+			rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+
+		/*
+		 * checking weather the user has given the messageNumber (most of the
+		 * cases this will not be the case where the system will generate the
+		 * message numbers
+		 */
+
+		// User should set it as a long object.
+		Long messageNumberLng = (Long) msgContext.getProperty(SandeshaClientConstants.MESSAGE_NUMBER);
+
+		long givenMessageNumber = -1;
+		if (messageNumberLng != null) {
+			givenMessageNumber = messageNumberLng.longValue();
+			if (givenMessageNumber <= 0) {
+				throw new SandeshaException(SandeshaMessageHelper.getMessage(
+						SandeshaMessageKeys.msgNumberMustBeLargerThanZero, Long.toString(givenMessageNumber)));
+			}
+		}
+
+		// A dummy message is a one which will not be processed as a actual
+		// application message.
+		// The RM handlers will simply let these go.
+		String dummyMessageString = (String) msgContext.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+		boolean dummyMessage = false;
+		if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+			dummyMessage = true;
+
+		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+
+		//see if the sequence is closed
+		if(rmsBean != null && rmsBean.isSequenceClosedClient()){
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed, internalSequenceId));
+		}
+
+		//see if the sequence is terminated
+		if(rmsBean != null && rmsBean.isTerminateAdded()) {
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated, internalSequenceId));
+		}
+
+		//see if the sequence is timed out
+		if(rmsBean != null && rmsBean.isTimedOut()){
+			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout, internalSequenceId));
+		}
+
+		//setting the reference msg store key.
+		if (rmsBean!=null && rmsBean.getReferenceMessageStoreKey()==null) {
+			//setting this application message as the reference, if it hsnt already been set.
+			
+			String referenceMsgKey = SandeshaUtil.getUUID();
+			storageManager.storeMessageContext(referenceMsgKey, msgContext);
+			rmsBean.setReferenceMessageStoreKey(referenceMsgKey);
+		}
+		
+		String outSequenceID = null;
+
+		if (rmsBean == null) { 
+			// SENDING THE CREATE SEQUENCE.
+			synchronized (RMSBeanMgr.class) {
+				// There is a timing window where 2 sending threads can hit this point
+				// at the same time and both will create an RMSBean to the same endpoint
+				// with the same internal sequenceid
+				// Check that someone hasn't created the bean
+				rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+				
+				// if first message - setup the sending side sequence - both for the
+				// server and the client sides.
+				if (rmsBean == null) {
+					rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
+					rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+				}
+			}
+		
+		} else {
+			outSequenceID = rmsBean.getSequenceID();
+		}
+		
+		// the message number that was last used.
+		long systemMessageNumber = rmsBean.getNextMessageNumber();
+
+		// The number given by the user has to be larger than the last stored
+		// number.
+		if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg, Long
+					.toString(givenMessageNumber));
+			throw new SandeshaException(message);
+		}
+
+		// Finding the correct message number.
+		long messageNumber = -1;
+		if (givenMessageNumber > 0) // if given message number is valid use it.
+									// (this is larger than the last stored due
+									// to the last check)
+			messageNumber = givenMessageNumber;
+		else if (systemMessageNumber > 0) { // if system message number is valid
+											// use it.
+			messageNumber = systemMessageNumber + 1;
+		} else { // This is the first message (systemMessageNumber = -1)
+			messageNumber = 1;
+		}
+		
+		if (lastMessage) {
+			rmsBean.setLastOutMessage(messageNumber);
+			// Update the rmsBean
+			storageManager.getRMSBeanMgr().update(rmsBean);
+		}
+
+		// set this as the response highest message.
+		rmsBean.setHighestOutMessageNumber(messageNumber);
+		
+		// saving the used message number, and the expected reply count
+		boolean startPolling = false;
+		if (!dummyMessage) {
+			rmsBean.setNextMessageNumber(messageNumber);
+
+			// Identify the MEP associated with the message.
+			AxisOperation op = msgContext.getAxisOperation();
+			int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+			if(op != null) {
+				mep = op.getAxisSpecifMEPConstant();
+			}
+
+			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+				long expectedReplies = rmsBean.getExpectedReplies();
+				rmsBean.setExpectedReplies(expectedReplies + 1);
+
+				// If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
+				//(do should be done only for WSRM 1.1)
+				
+				if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmMsgCtx.getRMSpecVersion())) {
+					EndpointReference oldEndpoint = msgContext.getReplyTo();
+					String oldAddress = (oldEndpoint == null) ? null : oldEndpoint.getAddress();
+					EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext
+							.getReplyTo(), configContext);
+					String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
+					if (newAddress != null && !newAddress.equals(oldAddress)) {
+						// We have rewritten the replyTo. If this is the first message that we have needed to
+						// rewrite then we should set the sequence up for polling, and once we have saved the
+						// changes to the sequence then we can start the polling thread.
+						msgContext.setReplyTo(newReplyTo);
+						if (!rmsBean.isPollingMode()) {
+							rmsBean.setPollingMode(true);
+							startPolling = true;
+						}
+					}
+				}
+			}
+		}
+		
+		RelatesTo relatesTo = msgContext.getRelatesTo();
+		if(relatesTo != null) {
+			rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
+		}
+
+		// setting async ack endpoint for the server side. (if present)
+		if (serverSide) {
+			if (rmsBean.getToEPR() != null) {
+				msgContext.setProperty(SandeshaClientConstants.AcksTo, rmsBean.getToEPR());
+			}
+		}
+
+		// Update the rmsBean
+		storageManager.getRMSBeanMgr().update(rmsBean);
+		
+		if(startPolling) {
+			SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
+		}
+		
+		SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
+		if (env == null) {
+			SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(env))
+					.getDefaultEnvelope();
+			rmMsgCtx.setSOAPEnvelop(envelope);
+		}
+
+		SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
+		if (soapBody == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		String messageId1 = SandeshaUtil.getUUID();
+		if (rmMsgCtx.getMessageId() == null) {
+			rmMsgCtx.setMessageId(messageId1);
+		}
+
+		EndpointReference toEPR = msgContext.getTo();
+		if (toEPR == null) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+			log.debug(message);
+			throw new SandeshaException(message);
+		}
+
+		// setting default actions.
+		String to = toEPR.getAddress();
+		String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
+		if (msgContext.getWSAAction() == null) {
+			msgContext.setWSAAction(to + "/" + operationName);
+		}
+		if (msgContext.getSoapAction() == null) {
+			msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
+		}
+
+		// processing the response if not an dummy.
+		if (!dummyMessage)
+			processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager);
+		
+		//Users wont be able to get reliable response msgs in the back channel in the back channel of a 
+		//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
+		msgContext.pause();
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+		return true;
+	}
+
+	private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, RMSBean rmsBean,
+			StorageManager storageManager) throws AxisFault {
+
+		if (log.isDebugEnabled())
+			log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+
+		MessageContext applicationMsg = applicationRMMsg.getMessageContext();
+		ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
+
+		// generating a new create sequeuce message.
+		RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(rmsBean, applicationRMMsg);
+
+		createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
+		CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
+				.getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+
+		SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+
+		SequenceOffer offer = createSequencePart.getSequenceOffer();
+		if (offer != null) {
+			String offeredSequenceId = offer.getIdentifer().getIdentifier();
+
+			rmsBean.setOfferedSequence(offeredSequenceId);
+		}
+
+		MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
+		createSeqMsg.setRelationships(null); // create seq msg does not
+												// relateTo anything
+		
+		String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key that will be used to store 
+																	   //the create sequence message.
+		
+		rmsBean.setCreateSeqMsgID(createSeqMsg.getMessageID());
+		rmsBean.setCreateSequenceMsgStoreKey(createSequenceMessageStoreKey);
+		
+		//cloning the message and storing it as a reference.
+		MessageContext clonedMessage = SandeshaUtil.cloneMessageContext(createSeqMsg);
+		String clonedMsgStoreKey = SandeshaUtil.getUUID();
+		storageManager.storeMessageContext(clonedMsgStoreKey, clonedMessage);
+		rmsBean.setReferenceMessageStoreKey(clonedMsgStoreKey);
+		
+		SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.MessageContextProperties.SECURITY_TOKEN);
+		if(token != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+			rmsBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
+		}
+		
+		storageManager.getRMSBeanMgr().insert(rmsBean);
+
+		SenderBean createSeqEntry = new SenderBean();
+		createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
+		createSeqEntry.setTimeToSend(System.currentTimeMillis());
+		createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
+		createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
+		// this will be set to true in the sender
+		createSeqEntry.setSend(true);
+		// Indicate that this message is a create sequence
+		createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+		EndpointReference to = createSeqRMMessage.getTo();
+		if (to!=null)
+			createSeqEntry.setToAddress(to.getAddress());
+		// If this message is targetted at an anonymous address then we must not have a transport
+		// ready for it, as the create sequence is not a reply.
+		if(to == null || to.hasAnonymousAddress())
+			createSeqEntry.setTransportAvailable(false);
+
+		createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+		
+		SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+
+		retransmitterMgr.insert(createSeqEntry);
+
+		// Setup enough of the workers to get this create sequence off the box.
+		SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+		
+		if (log.isDebugEnabled())
+			log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+		return rmsBean;
+	}
+
+	private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
+			String storageKey, StorageManager storageManager) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
+
+		MessageContext msg = rmMsg.getMessageContext();
+
+		SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+
+		// setting last message
+		boolean lastMessage = false;
+		if (msg.isServerSide()) {
+			Boolean inboundLast = (Boolean) msg.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE);
+			if (inboundLast != null && inboundLast.booleanValue()) {
+				lastMessage = true;
+			}
+
+		} else {
+			// client side
+			Object obj = msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
+			if (obj != null && "true".equals(obj)) {
+				lastMessage = true;
+			}
+		}
+
+		// Now that we have decided which sequence to use for the message, make sure that we secure
+		// it with the correct token.
+		RMMsgCreator.secureOutboundMessage(rmsBean, msg);
+
+		// Retransmitter bean entry for the application message
+		SenderBean appMsgEntry = new SenderBean();
+
+		appMsgEntry.setMessageContextRefKey(storageKey);
+
+		appMsgEntry.setTimeToSend(System.currentTimeMillis());
+		appMsgEntry.setMessageID(rmMsg.getMessageId());
+		appMsgEntry.setMessageNumber(messageNumber);
+		appMsgEntry.setLastMessage(lastMessage);
+		appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+		appMsgEntry.setInboundSequenceId(inboundSequence);
+		appMsgEntry.setInboundMessageNumber(inboundMessageNumber);
+		if (outSequenceID == null) {
+			appMsgEntry.setSend(false);
+		} else {
+			appMsgEntry.setSend(true);
+			// Send will be set to true at the sender.
+			msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+			appMsgEntry.setSequenceID(outSequenceID);
+		}
+		
+		EndpointReference to = rmMsg.getTo();
+		if (to!=null)
+			appMsgEntry.setToAddress(to.getAddress());
+		
+		appMsgEntry.setInternalSequenceID(internalSequenceId);
+
+		msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+		// increasing the current handler index, so that the message will not be
+		// going throught the SandeshaOutHandler again.
+		msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
+
+		SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+		retransmitterMgr.insert(appMsgEntry);
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
+	}
+
+}

Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,187 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing an incoming Close Sequence message. (As introduced
+ * by the WSRM 1.1 specification)
+ */
+
+public class CloseSequenceProcessor extends WSRMMessageSender implements MsgProcessor {
+
+	private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
+
+	public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled())
+			log.debug("Enter: CloseSequenceProcessor::processInMessage");
+
+		ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+		CloseSequence closeSequence = (CloseSequence) rmMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+
+		MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+		String sequenceId = closeSequence.getIdentifier().getIdentifier();
+		
+		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+				.getAxisConfiguration());
+
+		RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+
+		// Check that the sender of this CloseSequence holds the correct token
+		if(rmdBean != null && rmdBean.getSecurityTokenData() != null) {
+			SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+			OMElement body = msgCtx.getEnvelope().getBody();
+			SecurityToken token = secManager.recoverSecurityToken(rmdBean.getSecurityTokenData());
+			secManager.checkProofOfPossession(token, body, msgCtx);
+		}
+
+		if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: CloseSequenceProcessor::processInMessage, Unknown sequence " + sequenceId);
+			return false;
+		}
+		
+		// throwing a fault if the sequence is terminated
+		if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, rmdBean)) {
+			if (log.isDebugEnabled())
+				log.debug("Exit: CloseSequenceProcessor::processInMessage, Sequence terminated");
+			return false;
+		}
+
+		rmdBean.setClosed(true);
+		storageManager.getRMDBeanMgr().update(rmdBean);
+
+		RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, rmdBean, sequenceId, storageManager, true);
+		// adding the ack part(s) to the envelope.
+		Iterator sequenceAckIter = ackRMMsgCtx
+				.getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+		MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
+
+		RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, rmdBean);
+		MessageContext closeSequenceResponseMsg = closeSeqResponseRMMsg.getMessageContext();
+
+		while (sequenceAckIter.hasNext()) {
+			SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next();
+			closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+					sequenceAcknowledgement);
+		}
+		
+		closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+		closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+		closeSequenceResponseMsg.setResponseWritten(true);
+
+		closeSeqResponseRMMsg.addSOAPEnvelope();
+
+		AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
+
+		try {
+			engine.send(closeSequenceResponseMsg);
+		} catch (AxisFault e) {
+			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCloseResponse,
+					sequenceId, e.toString());
+			throw new SandeshaException(message, e);
+		}
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CloseSequenceProcessor::processInMessage " + Boolean.FALSE);
+		return false;
+	}
+
+	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+		if (log.isDebugEnabled()) 
+			log.debug("Enter: CloseSequenceProcessor::processOutMessage");
+		
+		// Get the data from the message context
+		setupOutMessage(rmMsgCtx);
+
+		//write into the sequence proeprties that the client is now closed
+		getRMSBean().setSequenceClosedClient(true);
+		getStorageManager().getRMSBeanMgr().update(getRMSBean());
+
+		AxisOperation closeOperation = SpecSpecificConstants.getWSRMOperation(
+				Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE,
+				rmMsgCtx.getRMSpecVersion(),
+				rmMsgCtx.getMessageContext().getAxisService());
+		getMsgContext().setAxisOperation(closeOperation);
+
+
+		OperationContext opcontext = ContextFactory.createOperationContext(closeOperation, getMsgContext().getServiceContext());
+		opcontext.setParent(getMsgContext().getServiceContext());
+
+		getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
+		getMsgContext().setOperationContext(opcontext);
+		
+		CloseSequence closeSequencePart = (CloseSequence) rmMsgCtx
+				.getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+		Identifier identifier = closeSequencePart.getIdentifier();
+		if (identifier==null) {
+			identifier = new Identifier (closeSequencePart.getNamespaceValue());
+			closeSequencePart.setIdentifier(identifier);
+		}
+		
+		rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(getRMVersion()));
+		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (getRMVersion()));
+
+		// Send this outgoing message
+		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
+
+		// Pause the message context
+		rmMsgCtx.pause();
+
+		if (log.isDebugEnabled())
+			log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
+		
+		return true;
+
+	}
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org