You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/04/23 11:55:16 UTC
svn commit: r531400 [3/18] - in /webservices/sandesha/trunk/java/modules:
client/ core/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/sandesha2/ core/src/main/java/org/...
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,187 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.handlers;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.AckRequestedProcessor;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * This is invoked in the outFlow of an RM endpoint
+ */
+
+public class SandeshaOutHandler extends AbstractHandler {
+
+ private static final long serialVersionUID = 8261092322051924103L;
+
+ private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
+
+ public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
+
+ InvocationResponse returnValue = InvocationResponse.CONTINUE;
+
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+ if (context == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ AxisService axisService = msgCtx.getAxisService();
+ if (axisService == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ //see if this message is unreliable i.e. WSRM not requried
+ if(SandeshaUtil.isMessageUnreliable(msgCtx)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
+ return returnValue;
+ }
+
+ // Also do not apply RM to fault messages
+ {
+ if(msgCtx.isProcessingFault()) {
+ if(log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message " + returnValue);
+ return returnValue ;
+ }
+ }
+
+ //this will change the execution chain of this message to work correctly in retransmissions.
+ //For e.g. Phases like security will be removed to be called in each retransmission.
+ SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+
+ String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+ if (null != DONE && "true".equals(DONE)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Application processing done " + returnValue);
+ return returnValue;
+ }
+
+ msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+ Transaction transaction = null;
+
+ try {
+ transaction = storageManager.getTransaction();
+
+ // getting rm message
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+ MsgProcessor msgProcessor = null;
+ int messageType = rmMsgCtx.getMessageType();
+ if(log.isDebugEnabled()) log.debug("Message Type: " + messageType);
+ if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+ if (msgCtx.isServerSide()) {
+ String inboundSequence = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
+ Long msgNum = (Long) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER);
+
+ if (inboundSequence != null && msgNum != null) {
+ msgProcessor = new ApplicationMsgProcessor(inboundSequence, msgNum.longValue());
+ }
+ } else // if client side.
+ msgProcessor = new ApplicationMsgProcessor();
+
+ } else {
+ msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ }
+
+ if (msgProcessor != null){
+ if(msgProcessor.processOutMessage(rmMsgCtx)){
+ //the msg was paused
+ returnValue = InvocationResponse.SUSPEND;
+ }
+ } else if (messageType==Sandesha2Constants.MessageTypes.ACK_REQUEST) {
+ AckRequestedProcessor ackRequestedProcessor = new AckRequestedProcessor ();
+ if(ackRequestedProcessor.processOutgoingAckRequestMessage (rmMsgCtx)){
+ //the msg was paused
+ returnValue = InvocationResponse.SUSPEND;
+ }
+ }
+
+ //we need the incoming thread to wait when suspending.
+ //Hence adding the boolean property.
+ //Should be done only to the server side
+ OperationContext opCtx = msgCtx.getOperationContext();
+ if(msgCtx.isServerSide() && opCtx != null && returnValue == InvocationResponse.SUSPEND) {
+ if(log.isDebugEnabled()) log.debug("Setting HOLD_RESPONSE property");
+ opCtx.setProperty(RequestResponseTransport.HOLD_RESPONSE, Boolean.TRUE);
+ }
+
+ } catch (Exception e) {
+ // message should not be sent in a exception situation.
+ msgCtx.pause();
+ returnValue = InvocationResponse.SUSPEND;
+
+ // rolling back the transaction
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ transaction = null;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+ log.debug(message, e);
+ }
+ }
+
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
+ throw new AxisFault(message, e);
+ } finally {
+ if (transaction != null) {
+ try {
+ transaction.commit();
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
+ return returnValue;
+ }
+
+ public String getName() {
+ return Sandesha2Constants.OUT_HANDLER_NAME;
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageHelper.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.i18n;
+
+import java.util.Locale;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+
+import org.apache.axis2.i18n.MessageBundle;
+import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.i18n.ProjectResourceBundle;
+
+public class SandeshaMessageHelper {
+ public static final String projectName = "org.apache.sandesha2".intern();
+ public static final String resourceName = "resource".intern();
+ public static final Locale locale = null;
+ public static final String msgBundleKey = projectName;
+
+ public static final String rootPackageName = "org.apache.sandesha2.i18n".intern();
+
+ public static final ResourceBundle rootBundle =
+ ProjectResourceBundle.getBundle(projectName,
+ rootPackageName,
+ resourceName,
+ locale,
+ SandeshaMessageHelper.class.getClassLoader(),
+ null);
+
+ public static void innit(){
+ MessageBundle bundle = new MessageBundle(
+ projectName,
+ rootPackageName,
+ resourceName,
+ locale,
+ SandeshaMessageHelper.class.getClassLoader(),
+ rootBundle);
+
+ Messages.addMessageBundle(msgBundleKey, bundle);
+ }
+
+
+ /**
+ * Get a message from resource.properties from the package of the given object.
+ *
+ * @param key The resource key
+ * @return The formatted message
+ */
+ public static String getMessage(String key)
+ throws MissingResourceException{
+ try{
+ return Messages.getMessageFromBundle(msgBundleKey, key);
+ }
+ catch(MissingResourceException e){
+ throw e;
+ }
+ catch(Exception e){
+ return null;
+ }
+ }
+
+ /**
+ * Get a message from resource.properties from the package of the given object.
+ *
+ * @param key The resource key
+ * @param arg0 The argument to place in variable {0}
+ * @return The formatted message
+ */
+ public static String getMessage(String key, String arg0)
+ throws MissingResourceException{
+ try{
+ return Messages.getMessageFromBundle(msgBundleKey, key, arg0);
+ }
+ catch(MissingResourceException e){
+ throw e;
+ }
+ catch(Exception e){
+ return null;
+ }
+ }
+
+
+ /**
+ * Get a message from resource.properties from the package of the given object.
+ *
+ * @param key The resource key
+ * @param arg0 The argument to place in variable {0}
+ * @param arg1 The argument to place in variable {1}
+ * @return The formatted message
+ */
+ public static String getMessage(String key, String arg0, String arg1)
+ throws MissingResourceException{
+ try{
+ return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1);
+ }
+ catch(MissingResourceException e){
+ throw e;
+ }
+ catch(Exception e){
+ return null;
+ }
+ }
+
+ /**
+ * Get a message from resource.properties from the package of the given object.
+ *
+ * @param key The resource key
+ * @param arg0 The argument to place in variable {0}
+ * @param arg1 The argument to place in variable {1}
+ * @param arg2 The argument to place in variable {2}
+ * @return The formatted message
+ */
+ public static String getMessage(String key, String arg0, String arg1, String arg2)
+ throws MissingResourceException{
+ try{
+ return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1, arg2);
+ }
+ catch(MissingResourceException e){
+ throw e;
+ }
+ catch(Exception e){
+ return null;
+ }
+ }
+
+ /**
+ * Get a message from resource.properties from the package of the given object.
+ *
+ * @param key The resource key
+ * @param arg0 The argument to place in variable {0}
+ * @param arg1 The argument to place in variable {1}
+ * @param arg2 The argument to place in variable {2}
+ * @param arg3 The argument to place in variable {3}
+ * @return The formatted message
+ */
+ public static String getMessage(String key, String arg0, String arg1, String arg2, String arg3)
+ throws MissingResourceException{
+ try{
+ return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1, arg2, arg3);
+ }
+ catch(MissingResourceException e){
+ throw e;
+ }
+ catch(Exception e){
+ return null;
+ }
+ }
+
+ /**
+ * Get a message from resource.properties from the package of the given object.
+ *
+ * @param key The resource key
+ * @param arg0 The argument to place in variable {0}
+ * @param arg1 The argument to place in variable {1}
+ * @param arg2 The argument to place in variable {2}
+ * @param arg3 The argument to place in variable {3}
+ * @param arg4 The argument to place in variable {4}
+ * @return The formatted message
+ */
+ public static String getMessage(String key, String arg0, String arg1, String arg2, String arg3, String arg4)
+ throws MissingResourceException{
+ try{
+ return Messages.getMessageFromBundle(msgBundleKey, key, arg0, arg1, arg2, arg3, arg4);
+ }
+ catch(MissingResourceException e){
+ throw e;
+ }
+ catch(Exception e){
+ return null;
+ }
+ }
+
+
+
+
+}
+
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.i18n;
+
+public class SandeshaMessageKeys {
+
+
+ public static final String cannotInitInMemoryStorageManager="cannotInitInMemoryStorageManager";
+ public static final String cannotInitPersistentStorageManager="cannotInitPersistentStorageManager";
+ public static final String cannotProceedDueToStorageManager="cannotProceedDueToStorageManager";
+ public static final String cannotGetStorageKey="cannotGetStorageKey";
+ public static final String cannotGetStorageManager="cannotGetStorageManager";
+ public static final String storageManagerMustImplement="storageManagerMustImplement";
+ public static final String cannotInitSecurityManager="cannotInitSecurityManager";
+ public static final String securityManagerMustImplement="securityManagerMustImplement";
+ public static final String cannotFindModulePolicies="cannotFindModulePolicies";
+ public static final String cannotPauseThread = "cannotPauseThread";
+
+ public static final String commitError="commitError";
+ public static final String rollbackError="rollbackError";
+ public static final String deadlock="deadlock";
+ public static final String noTransaction="noTransaction";
+ public static final String inMsgError="inMsgError";
+ public static final String outMsgError="outMsgError";
+ public static final String invokeMsgError="invokeMsgError";
+ public static final String sendMsgError="sendMsgError";
+ public static final String cannotSendMsgAsSequenceClosed="cannotSendMsgAsSequenceClosed";
+ public static final String cannotSendMsgAsSequenceTerminated="cannotSendMsgAsSequenceTerminated";
+ public static final String cannotSendMsgAsSequenceTimedout="cannotSendMsgAsSequenceTimedout";
+ public static final String noValidSyncResponse="noValidSyncResponse";
+ public static final String classLoaderNotFound="classLoaderNotFound";
+
+ public static final String defaultPropertyBeanNotSet="defaultPropertyBeanNotSet";
+ public static final String propertyBeanNotSet="propertyBeanNotSet";
+ public static final String optionsObjectNotSet="optionsObjectNotSet";
+ public static final String serviceContextNotSet="serviceContextNotSet";
+ public static final String sequenceIdBeanNotSet="sequenceIdBeanNotSet";
+ public static final String configContextNotSet="configContextNotSet";
+ public static final String soapEnvNotSet="soapEnvNotSet";
+ public static final String soapBodyNotPresent="soapBodyNotPresent";
+ public static final String unknownSoapVersion="unknownSoapVersion";
+ public static final String axisServiceIsNull="axisServiceIsNull";
+ public static final String msgContextNotSet="msgContextNotSet";
+ public static final String transportOutNotPresent="transportOutNotPresent";
+ public static final String couldNotFindOperation="couldNotFindOperation";
+ public static final String cannotChooseAcksTo="cannotChooseAcksTo";
+ public static final String cannotChooseSpecLevel="cannotChooseSpecLevel";
+
+ public static final String setAValidMsgNumber="setAValidMsgNumber";
+ public static final String cannotStartListenerForIncommingMsgs="cannotStartListenerForIncommingMsgs";
+ public static final String nonUniqueResult="nonUniqueResult";
+ public static final String invalidStringArray="invalidStringArray";
+ public static final String cannotCointinueSender="cannotCointinueSender";
+ public static final String sendHasUnavailableMsgEntry="sendHasUnavailableMsgEntry";
+ public static final String propertyInvalidValue="propertyInvalidValue";
+ public static final String invalidRange="invalidRange";
+ public static final String workAlreadyAssigned="workAlreadyAssigned";
+
+
+ public static final String rmNamespaceNotMatchSequence="rmNamespaceNotMatchSequence";
+ public static final String unknownWSAVersion="unknownWSAVersion";
+ public static final String emptyAckRequestSpecLevel="emptyAckRequestSpecLevel";
+ public static final String closeSequenceSpecLevel="closeSequenceSpecLevel";
+ public static final String unknownSpec="unknownSpec";
+ public static final String unknownRMNamespace="unknownRMNamespace";
+ public static final String unknownNamespace="unknownNamespace";
+ public static final String cannotDecideRMVersion="cannotDecideRMVersion";
+ public static final String specDoesNotSupportElement="specDoesNotSupportElement";
+
+ public static final String couldNotSendTerminate="couldNotSendTerminate";
+ public static final String couldNotSendClose="couldNotSendClose";
+ public static final String couldNotSendTerminateResponse="couldNotSendTerminateResponse";
+ public static final String couldNotSendTerminateSeqNotFound="couldNotSendTerminateSeqNotFound";
+ public static final String cannotSendAckRequestException="cannotSendAckRequestException";
+ public static final String ackRequestMultipleParts="ackRequestMultipleParts";
+ public static final String noAckRequestPartFound="noAckRequestPartFound";
+ public static final String noSequenceEstablished="noSequenceEstablished";
+ public static final String invalidInternalSequenceID="invalidInternalSequenceID";
+ public static final String createSeqEntryNotFound="createSeqEntryNotFound";
+
+ public static final String toEPRNotValid="toEPRNotValid";
+ public static final String cannotFindSequence="cannotFindSequence";
+ public static final String msgNumberMustBeLargerThanZero="msgNumberMustBeLargerThanZero";
+ public static final String msgNumberLargerThanLastMsg="msgNumberLargerThanLastMsg";
+ public static final String msgNumberNotLargerThanLastMsg="msgNumberNotLargerThanLastMsg";
+ public static final String ackInvalidNotSent="ackInvalidNotSent";
+ public static final String cannotHaveFinalWithNack="cannotHaveFinalWithNack";
+ public static final String accptButNoSequenceOffered="accptButNoSequenceOffered";
+ public static final String relatesToNotAvailable="relatesToNotAvailable";
+ public static final String cannotDerriveAckInterval="cannotDerriveAckInterval";
+ public static final String cannotDerriveRetransInterval="cannotDerriveRetransInterval";
+ public static final String cannotDerriveInactivityTimeout="cannotDerriveInactivityTimeout";
+ public static final String noCreateSeqParts="noCreateSeqParts";
+ public static final String noAcceptPart="noAcceptPart";
+ public static final String noAcksToPartInCreateSequence="noAcksToPartInCreateSequence";
+ public static final String tempSeqIdNotSet="tempSeqIdNotSet";
+ public static final String ackRandDoesNotHaveCorrectValues="ackRandDoesNotHaveCorrectValues";
+ public static final String cannotSetAckRangeNullElement="cannotSetAckRangeNullElement";
+ public static final String acksToStrNotSet="acksToStrNotSet";
+ public static final String invalidSequenceID="invalidsequenceID";
+ public static final String cantSendMakeConnectionNoTransportOut="cantSendMakeConnectionNoTransportOut";
+ public static final String makeConnectionDisabled="makeConnectionDisabled";
+
+ public static final String noCreateSeqResponse="noCreateSeqResponse";
+ public static final String noTerminateSeqPart="noTerminateSeqPart";
+ public static final String noNackInSeqAckPart="noNackInSeqAckPart";
+ public static final String nackDoesNotContainValidLongValue="nackDoesNotContainValidLongValue";
+ public static final String seqAckPartIsNull="seqAckPartIsNull";
+ public static final String noneNotAllowedNamespace="noneNotAllowedNamespace";
+ public static final String noneNotAllowedAckRangesPresent="noneNotAllowedAckRangesPresent";
+ public static final String noneNotAllowedNackPresent="noneNotAllowedNackPresent";
+ public static final String finalNotAllowedNamespace="finalNotAllowedNamespace";
+ public static final String noFaultCodeNullElement="noFaultCodeNullElement";
+ public static final String noSeqFaultInElement="noSeqFaultInElement";
+ public static final String noSeqOfferInElement="noSeqOfferInElement";
+ public static final String noTerminateSeqInElement="noTerminateSeqInElement";
+ public static final String noTerminateSeqResponseInElement="noTerminateSeqResponseInElement";
+ public static final String noAcceptPartInElement="noAcceptPartInElement";
+ public static final String noUpperOrLowerAttributesInElement="noUpperOrLowerAttributesInElement";
+ public static final String noSequencePartInElement="noSequencePartInElement";
+ public static final String noLastMessagePartInElement="noLastMessagePartInElement";
+ public static final String noFinalPartInElement="noFinalPartInElement";
+ public static final String noNonePartInElement="noNonePartInElement";
+ public static final String noCloseSequencePartInElement="noCloseSequencePartInElement";
+ public static final String noMessageNumberPartInElement="noMessageNumberPartInElement";
+ public static final String noCloseSeqResponsePartInElement="noCloseSeqResponsePartInElement";
+ public static final String noExpiresPartInElement="noExpiresPartInElement";
+ public static final String noCreateSeqPartInElement="noCreateSeqPartInElement";
+ public static final String noCreateSeqResponsePartInElement="noCreateSeqResponsePartInElement";
+ public static final String noFaultCodePart="noFaultCodePart";
+ public static final String cannotFindAddressElement="cannotFindAddressElement";
+ public static final String cannotFindAddressText="cannotFindAddressText";
+ public static final String nullPassedElement="nullPassedElement";
+ public static final String seqPartIsNull="seqPartIsNull";
+ public static final String incomingSequenceNotValidID="incomingSequenceNotValidID";
+
+ public static final String seqFaultCannotBeExtractedToNonHeader="seqFaultCannotBeExtractedToNonHeader";
+ public static final String seqElementCannotBeAddedToNonHeader="seqElementCannotBeAddedToNonHeader";
+ public static final String ackRequestedCannotBeAddedToNonHeader="ackRequestedCannotBeAddedToNonHeader";
+ public static final String terminateSeqCannotBeAddedToNonBody="terminateSeqCannotBeAddedToNonBody";
+ public static final String terminateSeqResponseCannotBeAddedToNonBody="terminateSeqResponseCannotBeAddedToNonBody";
+ public static final String closeSeqCannotBeAddedToNonBody="closeSeqCannotBeAddedToNonBody";
+ public static final String closeSeqResponseCannotBeAddedToNonBody="closeSeqResponseCannotBeAddedToNonBody";
+ public static final String createSeqCannotBeAddedToNonBody="createSeqCannotBeAddedToNonBody";
+ public static final String createSeqResponseCannotBeAddedToNonBody="createSeqResponseCannotBeAddedToNonBody";
+ public static final String closeSeqPartNullID="closeSeqPartNullID";
+ public static final String invalidIdentifier="invalidIdentifier";
+ public static final String closeSeqResponsePartNullID="closeSeqResponsePartNullID";
+ public static final String ackRequestNullID="ackRequestNullID";
+ public static final String createSeqNullAcksTo="createSeqNullAcksTo";
+ public static final String acceptNullAcksTo="acceptNullAcksTo";
+ public static final String noAcksToPart="noAcksToPart";
+ public static final String cannotProcessExpires="cannotProcessExpires";
+ public static final String noFaultCode="noFaultCode";
+
+ public static final String cannotSetAcksTo="cannotSetAcksTo";
+ public static final String cannotSetEndpoint="cannotSetEndpoint";
+ public static final String invalidMsgNumber="invalidMsgNumber";
+ public static final String addressNotValid="addressNotValid";
+
+ public static final String incommingSequenceReportNotFound="incommingSequenceReportNotFound";
+ public static final String cannotFindReportForGivenData="cannotFindReportForGivenData";
+
+ public static final String outSeqIDIsNull="outSeqIDIsNull";
+ public static final String requestMsgNotPresent="requestMsgNotPresent";
+ public static final String newSeqIdIsNull="newSeqIdIsNull";
+ public static final String terminateAddedPreviously="terminateAddedPreviously";
+ public static final String nullMsgId="nullMsgId";
+ public static final String failedToStoreMessage="failedToStoreMessage";
+ public static final String failedToLoadMessage="failedToLoadMessage";
+ public static final String entryNotPresentForUpdating="entryNotPresentForUpdating";
+ public static final String appMsgIsNull="appMsgIsNull";
+ public static final String cannotFindReqMsgFromOpContext="cannotFindReqMsgFromOpContext";
+
+ public static final String secureDummyNoProof="secureDummyNoProof";
+ public static final String secureDummyNoToken="secureDummyNoToken";
+ public static final String secureDummyNoSTR ="secureDummyNoSTR";
+
+ public static final String cannotFindTransportInDesc = "cannotFindTransportInDesc";
+ public static final String toEPRNotSet = "toEPRNotSet";
+ public static final String toBeanNotSet = "toBeanNotSet";
+
+
+ public final static String errorRetrievingSecurityToken = "errorRetrievingSecurityToken";
+ public final static String proofOfPossessionNotVerified = "proofOfPossessionNotVerified";
+ public final static String noSecurityResults = "noSecurityResults";
+ public final static String noSecConvTokenInPolicy = "noSecConvTokenInPolicy";
+
+ public final static String elementMustForSpec = "elementMustForSpec";
+ public final static String couldNotSendCreateSeqResponse = "couldNotSendCreateSeqResponse";
+ public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
+ public final static String invokerNotFound="invokerNotFound";
+
+ public final static String couldNotSendCloseResponse="couldNotSendCloseResponse";
+
+ public final static String couldNotLoadModulePolicies = "couldNotLoadModulePolicies";
+ public final static String modulePoliciesLoaded = "modulePoliciesLoaded";
+
+ public final static String createSequenceRefused = "createSequenceRefused";
+ public final static String referencedMessageNotFound = "referencedMessageNotFound";
+ public final static String messageNumberRollover = "messageNumberRollover";
+ public final static String sequenceTerminatedFault = "sequenceTerminatedFault";
+ public static final String unknownSequenceFault="unknownSequenceFault";
+ public static final String invalidAckFault="invalidAckFault";
+ public static final String cannotAcceptMsgAsSequenceClosedFault="cannotAcceptMsgAsSequenceClosedFault";
+
+ public final static String policyBeanNotFound = "policyBeanNotFound";
+ public final static String cloneDoesNotMatchToOriginal = "cloneDoesNotMatchToOriginal";
+ public final static String exceptionInFlowCompletion = "exceptionInFlowCompletion";
+ public final static String rmdBeanNotFound = "rmdBeanNotFound";
+ public final static String rmEnforceFailure = "rmEnforceFailure";
+ public final static String policyHasNotBeenSet = "policyHasNotBeenSet";
+ public final static String referenceMessageNotSetForSequence = "referenceMessageNotSetForSequence";
+ public final static String moduleNotSet = "moduleNotSet";
+ public final static String cannotSetPolicyBeanServiceNull = "cannotSetPolicyBeanServiceNull";
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,336 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.wsrm.AckRequested;
+
+/**
+ * Responsible for processing ack requested headers on incoming messages.
+ */
+
+public class AckRequestedProcessor extends WSRMMessageSender {
+
+ private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
+
+ public boolean processAckRequestedHeaders(RMMsgContext message) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AckRequestedProcessor::processAckRequestHeaders");
+
+ SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
+ SOAPHeader header = envelope.getHeader();
+ boolean msgCtxPaused = false;
+ if(header!=null)
+ {
+ for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
+ QName headerName = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.ACK_REQUESTED);
+
+ Iterator acks = header.getChildrenWithName(headerName);
+ while(acks.hasNext()) {
+ OMElement ack = (OMElement) acks.next();
+ AckRequested ackReq = new AckRequested(headerName.getNamespaceURI());
+ ackReq.fromOMElement(ack);
+ boolean paused = processAckRequestedHeader(message, ack, ackReq);
+ //if nto already paused we might be now
+ if(!msgCtxPaused){
+ msgCtxPaused = paused;
+ }
+ }
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AckRequestedProcessor::processAckRequestHeaders " + msgCtxPaused);
+ return msgCtxPaused;
+ }
+
+ /**
+ *
+ * @param msgContext
+ * @param soapHeader
+ * @param ackRequested
+ * @return true if the msg context was paused
+ * @throws AxisFault
+ */
+ public boolean processAckRequestedHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
+
+ String sequenceId = ackRequested.getIdentifier().getIdentifier();
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configurationContext = msgContext.getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
+ configurationContext.getAxisConfiguration());
+
+ // Check that the sender of this AckRequest holds the correct token
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+
+ if(rmdBean != null && rmdBean.getSecurityTokenData() != null) {;
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(configurationContext);
+ SecurityToken token = secManager.recoverSecurityToken(rmdBean.getSecurityTokenData());
+
+ secManager.checkProofOfPossession(token, soapHeader, msgContext);
+ }
+
+ // Check that the sequence requested exists
+ if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, Unknown sequence ");
+ return false;
+ }
+
+ // throwing a fault if the sequence is terminated
+ if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, rmdBean)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, Sequence terminated");
+ return false;
+ }
+
+ // Setting the ack depending on AcksTo.
+ EndpointReference acksTo = new EndpointReference(rmdBean.getAcksToEPR());
+ String acksToStr = acksTo.getAddress();
+
+ if (acksToStr == null)
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
+
+ AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.ACK,
+ rmdBean.getRMVersion(),
+ msgContext.getAxisService());
+ MessageContext ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
+ ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
+
+ ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(msgContext.getEnvelope()));
+
+ // Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+ try {
+ ackMsgCtx.setEnvelope(envelope);
+ } catch (AxisFault e3) {
+ throw new SandeshaException(e3.getMessage());
+ }
+
+ ackMsgCtx.setTo(acksTo);
+ ackMsgCtx.setReplyTo(msgContext.getTo());
+ RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean);
+ ackRMMsgCtx.getMessageContext().setServerSide(true);
+
+ if (acksTo.hasAnonymousAddress()) {
+
+ AxisEngine engine = new AxisEngine(ackRMMsgCtx.getMessageContext().getConfigurationContext());
+
+ // setting CONTEXT_WRITTEN since acksto is anonymous
+ if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
+ // operation context will be null when doing in a GLOBAL
+ // handler.
+
+ ServiceContext serviceCtx = msgContext.getServiceContext();
+ OperationContext opCtx = ContextFactory.createOperationContext(ackOperation, serviceCtx);
+
+ rmMsgCtx.getMessageContext().setOperationContext(opCtx);
+ }
+
+ rmMsgCtx.getMessageContext().getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_TRUE);
+
+ rmMsgCtx.getMessageContext().setProperty(Sandesha2Constants.ACK_WRITTEN, "true");
+
+ try {
+ engine.send(ackRMMsgCtx.getMessageContext());
+ } catch (AxisFault e1) {
+ throw new SandeshaException(e1.getMessage());
+ }
+
+ } else {
+
+ SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+
+ String key = SandeshaUtil.getUUID();
+
+ // dumping to the storage will be done be Sandesha2 Transport Sender
+ // storageManager.storeMessageContext(key,ackMsgCtx);
+
+ SenderBean ackBean = new SenderBean();
+ ackBean.setMessageContextRefKey(key);
+ ackBean.setMessageID(ackMsgCtx.getMessageID());
+ ackBean.setReSend(false);
+ ackBean.setSequenceID(sequenceId);
+
+ EndpointReference to = ackMsgCtx.getTo();
+ if (to!=null)
+ ackBean.setToAddress(to.getAddress());
+
+ // this will be set to true in the sender.
+ ackBean.setSend(true);
+
+ ackMsgCtx.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ ackBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+
+ // the internalSequenceId value of the retransmitter Table for the
+ // messages related to an incoming
+ // sequence is the actual sequence ID
+
+ // operation is the lowest level, Sandesha2 can be engaged.
+ SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
+
+ long ackInterval = propertyBean.getAcknowledgementInterval();
+
+ // Ack will be sent as stand alone, only after the retransmitter
+ // interval.
+ long timeToSend = System.currentTimeMillis() + ackInterval;
+
+ // removing old acks.
+ SenderBean findBean = new SenderBean();
+ findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+ findBean.setSend(true);
+ findBean.setReSend(false);
+ Collection coll = retransmitterBeanMgr.find(findBean);
+ Iterator it = coll.iterator();
+
+ if (it.hasNext()) {
+ SenderBean oldAckBean = (SenderBean) it.next();
+ // If there is an old ack. This ack will be sent in the old timeToSend.
+ timeToSend = oldAckBean.getTimeToSend();
+ retransmitterBeanMgr.delete(oldAckBean.getMessageID());
+ }
+
+ ackBean.setTimeToSend(timeToSend);
+
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ // passing the message through sandesha2sender
+
+ SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+
+ // inserting the new ack.
+ retransmitterBeanMgr.insert(ackBean);
+
+ msgContext.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader " + Boolean.TRUE);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This is used to capture AckRequest messages send by the SandeshaClient.
+ * This will send that message using the Sandesha2 Sender.
+ *
+ * @param rmMsgContext
+ */
+ public boolean processOutgoingAckRequestMessage (RMMsgContext ackRequestRMMsg) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: AckRequestedProcessor::processOutgoingAckRequestMessage");
+
+ setupOutMessage(ackRequestRMMsg);
+
+ AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.ACK,
+ getRMVersion(),
+ getMsgContext().getAxisService());
+ getMsgContext().setAxisOperation(ackOperation);
+
+ ServiceContext serviceCtx = getMsgContext().getServiceContext();
+ OperationContext opcontext = ContextFactory.createOperationContext(ackOperation, serviceCtx);
+ opcontext.setParent(getMsgContext().getServiceContext());
+
+ getConfigurationContext().registerOperationContext(ackRequestRMMsg.getMessageId(), opcontext);
+ getMsgContext().setOperationContext(opcontext);
+
+ Iterator iterator = ackRequestRMMsg.getMessageParts(Sandesha2Constants.MessageParts.ACK_REQUEST);
+
+ AckRequested ackRequested = null;
+ while (iterator.hasNext()) {
+ ackRequested = (AckRequested) iterator.next();
+ }
+
+ if (iterator.hasNext()) {
+ throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackRequestMultipleParts));
+ }
+
+ if (ackRequested==null) {
+ throw new SandeshaException (SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noAckRequestPartFound));
+ }
+
+ ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (getRMVersion()));
+ ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (getRMVersion()));
+
+ sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
+
+ // Pause the message context
+ ackRequestRMMsg.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AckRequestedProcessor::processOutgoingAckRequestMessage " + Boolean.TRUE);
+
+ return true;
+
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,257 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.Range;
+import org.apache.sandesha2.util.RangeString;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
+import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing acknowledgement headers on incoming messages.
+ */
+
+public class AcknowledgementProcessor {
+
+ private static final Log log = LogFactory.getLog(AcknowledgementProcessor.class);
+
+ /**
+ * @param message
+ * @throws AxisFault
+ */
+ public void processAckHeaders(RMMsgContext message) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementProcessor::processAckHeaders");
+
+ SOAPEnvelope envelope = message.getMessageContext().getEnvelope();
+ SOAPHeader header = envelope.getHeader();
+ if(header!=null)
+ {
+ for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
+ QName headerName = new QName(Sandesha2Constants.SPEC_NS_URIS[i], Sandesha2Constants.WSRM_COMMON.SEQUENCE_ACK);
+
+ Iterator acks = header.getChildrenWithName(headerName);
+ while(acks.hasNext()) {
+ OMElement ack = (OMElement) acks.next();
+ SequenceAcknowledgement seqAck = new SequenceAcknowledgement(headerName.getNamespaceURI());
+ seqAck.fromOMElement(ack);
+ processAckHeader(message, ack, seqAck);
+ }
+ }
+ }
+
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeaders");
+ }
+
+ /**
+ * @param rmMsgCtx
+ * @param soapHeader
+ * @param sequenceAck
+ * @throws AxisFault
+ */
+ private void processAckHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
+ throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: AcknowledgementProcessor::processAckHeader " + soapHeader);
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+ ConfigurationContext configCtx = msgCtx.getConfigurationContext();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+
+ SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+
+ String outSequenceId = sequenceAck.getIdentifier().getIdentifier();
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
+
+ if (outSequenceId == null || "".equals(outSequenceId)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+ if (FaultManager.checkForUnknownSequence(rmMsgCtx, outSequenceId, storageManager)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Unknown sequence");
+ return;
+ }
+ if (FaultManager.checkForSequenceTerminated(rmMsgCtx, outSequenceId, rmsBean)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Sequence terminated");
+ return;
+ }
+
+ // Check that the sender of this Ack holds the correct token
+ String internalSequenceId = rmsBean.getInternalSequenceID();
+ if(rmsBean.getSecurityTokenData() != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+ SecurityToken token = secManager.recoverSecurityToken(rmsBean.getSecurityTokenData());
+
+ secManager.checkProofOfPossession(token, soapHeader, msgCtx);
+ }
+
+ if(log.isDebugEnabled()) log.debug("Got Ack for RM Sequence: " + outSequenceId + ", internalSeqId: " + internalSequenceId);
+ Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges().iterator();
+
+ if (FaultManager.checkForUnknownSequence(rmMsgCtx, outSequenceId, storageManager)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Unknown sequence ");
+ return;
+ }
+
+ if (FaultManager.checkForInvalidAcknowledgement(rmMsgCtx, sequenceAck, storageManager, rmsBean)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack range ");
+ return;
+ }
+
+ String replyToAddress = rmsBean.getReplyToEPR();
+ EndpointReference replyTo = new EndpointReference (replyToAddress);
+ boolean anonReplyTo = replyTo.hasAnonymousAddress();
+
+ String rmVersion = rmMsgCtx.getRMSpecVersion();
+
+ // Compare the clientCompletedMessages with the range we just got, to work out if there
+ // is any new information in this ack message
+ RangeString completedMessages = rmsBean.getClientCompletedMessages();
+ long numberOfNewMessagesAcked = 0;
+
+ while(ackRangeIterator.hasNext()) {
+ AcknowledgementRange ackRange = (AcknowledgementRange) ackRangeIterator.next();
+ long lower = ackRange.getLowerValue();
+ long upper = ackRange.getUpperValue();
+
+ // Quick check to see if the whole range is covered
+ if(!completedMessages.isRangeCompleted(new Range(lower, upper))) {
+ // We have new info, so take each message one at a time
+ for (long messageNo = lower; messageNo <= upper; messageNo++) {
+ if(!completedMessages.isMessageNumberInRanges(messageNo)) {
+ // We have a new message to consider
+ numberOfNewMessagesAcked++;
+ completedMessages.addRange(new Range(messageNo, messageNo));
+
+ SenderBean matcher = new SenderBean();
+ matcher.setSequenceID(outSequenceId);
+ matcher.setMessageNumber(messageNo);
+
+ SenderBean retransmitterBean = retransmitterMgr.findUnique(matcher);
+ if (retransmitterBean != null) {
+ // Check we haven't got an Ack for a message that hasn't been sent yet !
+ if (retransmitterBean.getSentCount() == 0) {
+ FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
+ storageManager);
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack");
+ return;
+ }
+
+ String storageKey = retransmitterBean.getMessageContextRefKey();
+
+ boolean syncResponseNeeded = false;
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
+ MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
+ AxisOperation operation = applicationMessage.getAxisOperation();
+ if(operation!= null) {
+ int mep = operation.getAxisSpecifMEPConstant();
+ syncResponseNeeded = (mep == WSDLConstants.MEP_CONSTANT_OUT_IN);
+ }
+ }
+
+ if (!syncResponseNeeded) {
+ // removing the application message from the storage.
+ retransmitterMgr.delete(retransmitterBean.getMessageID());
+ storageManager.removeMessageContext(storageKey);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // updating the last activated time of the sequence.
+ rmsBean.setLastActivatedTime(System.currentTimeMillis());
+
+ //adding a MakeConnection for the response sequence if needed.
+ if (rmsBean.getOfferedSequence() != null) {
+
+ RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
+ RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
+
+ if (rMDBean!=null && rMDBean.isPollingMode()) {
+ PollingManager manager = storageManager.getPollingManager();
+ if(manager != null) manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ }
+ }
+
+ // We overwrite the previous client completed message ranges with the
+ // latest view, but only if it is an update i.e. contained a new
+ // ack range (which is because we do not previous acks arriving late
+ // to break us)
+ if (numberOfNewMessagesAcked>0) {
+ rmsBean.setClientCompletedMessages(completedMessages);
+ long noOfMsgsAcked = rmsBean.getNumberOfMessagesAcked() + numberOfNewMessagesAcked;
+ rmsBean.setNumberOfMessagesAcked(noOfMsgsAcked);
+ }
+
+ // Update the RMSBean
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ // Try and terminate the sequence
+ if (!rmsBean.isAvoidAutoTermination())
+ TerminateManager.checkAndTerminate(rmMsgCtx.getConfigurationContext(), storageManager, rmsBean);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader");
+ }
+
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,529 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.wsdl.WSDLConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SandeshaListener;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.wsrm.CreateSequence;
+import org.apache.sandesha2.wsrm.SequenceOffer;
+
+/**
+ * Responsible for processing an incoming Application message.
+ */
+
+public class ApplicationMsgProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class);
+
+ private String inboundSequence = null;
+ private long inboundMessageNumber;
+
+ public ApplicationMsgProcessor() {
+ // Nothing to do
+ }
+
+ public ApplicationMsgProcessor(String inboundSequenceId, long inboundMessageNumber) {
+ this.inboundSequence = inboundSequenceId;
+ this.inboundMessageNumber = inboundMessageNumber;
+ }
+
+ public boolean processInMessage(RMMsgContext rmMsgCtx) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: ApplicationMsgProcessor::processInMessage");
+ log.debug("Exit: ApplicationMsgProcessor::processInMessage");
+ }
+ return false;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
+
+ MessageContext msgContext = rmMsgCtx.getMessageContext();
+ ConfigurationContext configContext = msgContext.getConfigurationContext();
+
+ // setting the Fault callback
+ SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
+ SandeshaClientConstants.SANDESHA_LISTENER);
+ if (faultCallback != null) {
+ OperationContext operationContext = msgContext.getOperationContext();
+ if (operationContext != null) {
+ operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
+ }
+ }
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext
+ .getAxisConfiguration());
+
+ boolean serverSide = msgContext.isServerSide();
+
+ // setting message Id if null
+ if (msgContext.getMessageID() == null)
+ msgContext.setMessageID(SandeshaUtil.getUUID());
+
+ // find internal sequence id
+ String internalSequenceId = null;
+
+ String storageKey = SandeshaUtil.getUUID(); // the key which will be
+ // used to store this
+ // message.
+
+ /*
+ * Internal sequence id is the one used to refer to the sequence (since
+ * actual sequence id is not available when first msg arrives) server
+ * side - a derivation of the sequenceId of the incoming sequence client
+ * side - a derivation of wsaTo & SeequenceKey
+ */
+
+ boolean lastMessage = false;
+ if (serverSide) {
+ if (inboundSequence == null || "".equals(inboundSequence)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.incomingSequenceNotValidID, inboundSequence);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ internalSequenceId = SandeshaUtil.getOutgoingSideInternalSequenceID(inboundSequence);
+
+ // Deciding whether this is the last message. We assume it is if it relates to
+ // a message which arrived with the LastMessage flag on it.
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
+ // Get the last in message
+ String lastRequestId = rmdBean.getLastInMessageId();
+ RelatesTo relatesTo = msgContext.getRelatesTo();
+ if(relatesTo != null && lastRequestId != null &&
+ lastRequestId.equals(relatesTo.getValue())) {
+ lastMessage = true;
+ }
+
+ } else {
+ // set the internal sequence id for the client side.
+ EndpointReference toEPR = msgContext.getTo();
+ if (toEPR == null || toEPR.getAddress() == null || "".equals(toEPR.getAddress())) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String to = toEPR.getAddress();
+ String sequenceKey = (String) msgContext.getProperty(SandeshaClientConstants.SEQUENCE_KEY);
+ internalSequenceId = SandeshaUtil.getInternalSequenceID(to, sequenceKey);
+
+ String lastAppMessage = (String) msgContext.getProperty(SandeshaClientConstants.LAST_MESSAGE);
+ if (lastAppMessage != null && "true".equals(lastAppMessage))
+ lastMessage = true;
+ }
+
+ if (internalSequenceId!=null)
+ rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
+
+ /*
+ * checking weather the user has given the messageNumber (most of the
+ * cases this will not be the case where the system will generate the
+ * message numbers
+ */
+
+ // User should set it as a long object.
+ Long messageNumberLng = (Long) msgContext.getProperty(SandeshaClientConstants.MESSAGE_NUMBER);
+
+ long givenMessageNumber = -1;
+ if (messageNumberLng != null) {
+ givenMessageNumber = messageNumberLng.longValue();
+ if (givenMessageNumber <= 0) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.msgNumberMustBeLargerThanZero, Long.toString(givenMessageNumber)));
+ }
+ }
+
+ // A dummy message is a one which will not be processed as a actual
+ // application message.
+ // The RM handlers will simply let these go.
+ String dummyMessageString = (String) msgContext.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
+ boolean dummyMessage = false;
+ if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
+ dummyMessage = true;
+
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+
+ //see if the sequence is closed
+ if(rmsBean != null && rmsBean.isSequenceClosedClient()){
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed, internalSequenceId));
+ }
+
+ //see if the sequence is terminated
+ if(rmsBean != null && rmsBean.isTerminateAdded()) {
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated, internalSequenceId));
+ }
+
+ //see if the sequence is timed out
+ if(rmsBean != null && rmsBean.isTimedOut()){
+ throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout, internalSequenceId));
+ }
+
+ //setting the reference msg store key.
+ if (rmsBean!=null && rmsBean.getReferenceMessageStoreKey()==null) {
+ //setting this application message as the reference, if it hsnt already been set.
+
+ String referenceMsgKey = SandeshaUtil.getUUID();
+ storageManager.storeMessageContext(referenceMsgKey, msgContext);
+ rmsBean.setReferenceMessageStoreKey(referenceMsgKey);
+ }
+
+ String outSequenceID = null;
+
+ if (rmsBean == null) {
+ // SENDING THE CREATE SEQUENCE.
+ synchronized (RMSBeanMgr.class) {
+ // There is a timing window where 2 sending threads can hit this point
+ // at the same time and both will create an RMSBean to the same endpoint
+ // with the same internal sequenceid
+ // Check that someone hasn't created the bean
+ rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
+
+ // if first message - setup the sending side sequence - both for the
+ // server and the client sides.
+ if (rmsBean == null) {
+ rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
+ rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+ }
+ }
+
+ } else {
+ outSequenceID = rmsBean.getSequenceID();
+ }
+
+ // the message number that was last used.
+ long systemMessageNumber = rmsBean.getNextMessageNumber();
+
+ // The number given by the user has to be larger than the last stored
+ // number.
+ if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg, Long
+ .toString(givenMessageNumber));
+ throw new SandeshaException(message);
+ }
+
+ // Finding the correct message number.
+ long messageNumber = -1;
+ if (givenMessageNumber > 0) // if given message number is valid use it.
+ // (this is larger than the last stored due
+ // to the last check)
+ messageNumber = givenMessageNumber;
+ else if (systemMessageNumber > 0) { // if system message number is valid
+ // use it.
+ messageNumber = systemMessageNumber + 1;
+ } else { // This is the first message (systemMessageNumber = -1)
+ messageNumber = 1;
+ }
+
+ if (lastMessage) {
+ rmsBean.setLastOutMessage(messageNumber);
+ // Update the rmsBean
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ }
+
+ // set this as the response highest message.
+ rmsBean.setHighestOutMessageNumber(messageNumber);
+
+ // saving the used message number, and the expected reply count
+ boolean startPolling = false;
+ if (!dummyMessage) {
+ rmsBean.setNextMessageNumber(messageNumber);
+
+ // Identify the MEP associated with the message.
+ AxisOperation op = msgContext.getAxisOperation();
+ int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+ if(op != null) {
+ mep = op.getAxisSpecifMEPConstant();
+ }
+
+ if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+ long expectedReplies = rmsBean.getExpectedReplies();
+ rmsBean.setExpectedReplies(expectedReplies + 1);
+
+ // If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
+ //(do should be done only for WSRM 1.1)
+
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmMsgCtx.getRMSpecVersion())) {
+ EndpointReference oldEndpoint = msgContext.getReplyTo();
+ String oldAddress = (oldEndpoint == null) ? null : oldEndpoint.getAddress();
+ EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext
+ .getReplyTo(), configContext);
+ String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
+ if (newAddress != null && !newAddress.equals(oldAddress)) {
+ // We have rewritten the replyTo. If this is the first message that we have needed to
+ // rewrite then we should set the sequence up for polling, and once we have saved the
+ // changes to the sequence then we can start the polling thread.
+ msgContext.setReplyTo(newReplyTo);
+ if (!rmsBean.isPollingMode()) {
+ rmsBean.setPollingMode(true);
+ startPolling = true;
+ }
+ }
+ }
+ }
+ }
+
+ RelatesTo relatesTo = msgContext.getRelatesTo();
+ if(relatesTo != null) {
+ rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
+ }
+
+ // setting async ack endpoint for the server side. (if present)
+ if (serverSide) {
+ if (rmsBean.getToEPR() != null) {
+ msgContext.setProperty(SandeshaClientConstants.AcksTo, rmsBean.getToEPR());
+ }
+ }
+
+ // Update the rmsBean
+ storageManager.getRMSBeanMgr().update(rmsBean);
+
+ if(startPolling) {
+ SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
+ }
+
+ SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
+ if (env == null) {
+ SOAPEnvelope envelope = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil.getSOAPVersion(env))
+ .getDefaultEnvelope();
+ rmMsgCtx.setSOAPEnvelop(envelope);
+ }
+
+ SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
+ if (soapBody == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ String messageId1 = SandeshaUtil.getUUID();
+ if (rmMsgCtx.getMessageId() == null) {
+ rmMsgCtx.setMessageId(messageId1);
+ }
+
+ EndpointReference toEPR = msgContext.getTo();
+ if (toEPR == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
+ log.debug(message);
+ throw new SandeshaException(message);
+ }
+
+ // setting default actions.
+ String to = toEPR.getAddress();
+ String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
+ if (msgContext.getWSAAction() == null) {
+ msgContext.setWSAAction(to + "/" + operationName);
+ }
+ if (msgContext.getSoapAction() == null) {
+ msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
+ }
+
+ // processing the response if not an dummy.
+ if (!dummyMessage)
+ processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager);
+
+ //Users wont be able to get reliable response msgs in the back channel in the back channel of a
+ //reliable message. If he doesn't have a endpoint he should use polling mechanisms.
+ msgContext.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+ return true;
+ }
+
+ private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, RMSBean rmsBean,
+ StorageManager storageManager) throws AxisFault {
+
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+
+ MessageContext applicationMsg = applicationRMMsg.getMessageContext();
+ ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
+
+ // generating a new create sequeuce message.
+ RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(rmsBean, applicationRMMsg);
+
+ createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
+ CreateSequence createSequencePart = (CreateSequence) createSeqRMMessage
+ .getMessagePart(Sandesha2Constants.MessageParts.CREATE_SEQ);
+
+ SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+
+ SequenceOffer offer = createSequencePart.getSequenceOffer();
+ if (offer != null) {
+ String offeredSequenceId = offer.getIdentifer().getIdentifier();
+
+ rmsBean.setOfferedSequence(offeredSequenceId);
+ }
+
+ MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
+ createSeqMsg.setRelationships(null); // create seq msg does not
+ // relateTo anything
+
+ String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key that will be used to store
+ //the create sequence message.
+
+ rmsBean.setCreateSeqMsgID(createSeqMsg.getMessageID());
+ rmsBean.setCreateSequenceMsgStoreKey(createSequenceMessageStoreKey);
+
+ //cloning the message and storing it as a reference.
+ MessageContext clonedMessage = SandeshaUtil.cloneMessageContext(createSeqMsg);
+ String clonedMsgStoreKey = SandeshaUtil.getUUID();
+ storageManager.storeMessageContext(clonedMsgStoreKey, clonedMessage);
+ rmsBean.setReferenceMessageStoreKey(clonedMsgStoreKey);
+
+ SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.MessageContextProperties.SECURITY_TOKEN);
+ if(token != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
+ rmsBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
+ }
+
+ storageManager.getRMSBeanMgr().insert(rmsBean);
+
+ SenderBean createSeqEntry = new SenderBean();
+ createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
+ createSeqEntry.setTimeToSend(System.currentTimeMillis());
+ createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
+ createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
+ // this will be set to true in the sender
+ createSeqEntry.setSend(true);
+ // Indicate that this message is a create sequence
+ createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
+ EndpointReference to = createSeqRMMessage.getTo();
+ if (to!=null)
+ createSeqEntry.setToAddress(to.getAddress());
+ // If this message is targetted at an anonymous address then we must not have a transport
+ // ready for it, as the create sequence is not a reply.
+ if(to == null || to.hasAnonymousAddress())
+ createSeqEntry.setTransportAvailable(false);
+
+ createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+
+ retransmitterMgr.insert(createSeqEntry);
+
+ // Setup enough of the workers to get this create sequence off the box.
+ SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+ return rmsBean;
+ }
+
+ private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
+ String storageKey, StorageManager storageManager) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
+
+ MessageContext msg = rmMsg.getMessageContext();
+
+ SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
+
+ // setting last message
+ boolean lastMessage = false;
+ if (msg.isServerSide()) {
+ Boolean inboundLast = (Boolean) msg.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE);
+ if (inboundLast != null && inboundLast.booleanValue()) {
+ lastMessage = true;
+ }
+
+ } else {
+ // client side
+ Object obj = msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
+ if (obj != null && "true".equals(obj)) {
+ lastMessage = true;
+ }
+ }
+
+ // Now that we have decided which sequence to use for the message, make sure that we secure
+ // it with the correct token.
+ RMMsgCreator.secureOutboundMessage(rmsBean, msg);
+
+ // Retransmitter bean entry for the application message
+ SenderBean appMsgEntry = new SenderBean();
+
+ appMsgEntry.setMessageContextRefKey(storageKey);
+
+ appMsgEntry.setTimeToSend(System.currentTimeMillis());
+ appMsgEntry.setMessageID(rmMsg.getMessageId());
+ appMsgEntry.setMessageNumber(messageNumber);
+ appMsgEntry.setLastMessage(lastMessage);
+ appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ appMsgEntry.setInboundSequenceId(inboundSequence);
+ appMsgEntry.setInboundMessageNumber(inboundMessageNumber);
+ if (outSequenceID == null) {
+ appMsgEntry.setSend(false);
+ } else {
+ appMsgEntry.setSend(true);
+ // Send will be set to true at the sender.
+ msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+ appMsgEntry.setSequenceID(outSequenceID);
+ }
+
+ EndpointReference to = rmMsg.getTo();
+ if (to!=null)
+ appMsgEntry.setToAddress(to.getAddress());
+
+ appMsgEntry.setInternalSequenceID(internalSequenceId);
+
+ msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ // increasing the current handler index, so that the message will not be
+ // going throught the SandeshaOutHandler again.
+ msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
+
+ SandeshaUtil.executeAndStore(rmMsg, storageKey);
+
+ retransmitterMgr.insert(appMsgEntry);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
+ }
+
+}
Added: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=auto&rev=531400
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (added)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Apr 23 02:54:53 2007
@@ -0,0 +1,187 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ContextFactory;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.SpecSpecificConstants;
+import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.Identifier;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing an incoming Close Sequence message. (As introduced
+ * by the WSRM 1.1 specification)
+ */
+
+public class CloseSequenceProcessor extends WSRMMessageSender implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
+
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: CloseSequenceProcessor::processInMessage");
+
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ CloseSequence closeSequence = (CloseSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+ String sequenceId = closeSequence.getIdentifier().getIdentifier();
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+
+ RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
+
+ // Check that the sender of this CloseSequence holds the correct token
+ if(rmdBean != null && rmdBean.getSecurityTokenData() != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+ OMElement body = msgCtx.getEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(rmdBean.getSecurityTokenData());
+ secManager.checkProofOfPossession(token, body, msgCtx);
+ }
+
+ if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: CloseSequenceProcessor::processInMessage, Unknown sequence " + sequenceId);
+ return false;
+ }
+
+ // throwing a fault if the sequence is terminated
+ if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, rmdBean)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: CloseSequenceProcessor::processInMessage, Sequence terminated");
+ return false;
+ }
+
+ rmdBean.setClosed(true);
+ storageManager.getRMDBeanMgr().update(rmdBean);
+
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, rmdBean, sequenceId, storageManager, true);
+ // adding the ack part(s) to the envelope.
+ Iterator sequenceAckIter = ackRMMsgCtx
+ .getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+ MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
+
+ RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, rmdBean);
+ MessageContext closeSequenceResponseMsg = closeSeqResponseRMMsg.getMessageContext();
+
+ while (sequenceAckIter.hasNext()) {
+ SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next();
+ closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+ sequenceAcknowledgement);
+ }
+
+ closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+ closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ closeSequenceResponseMsg.setResponseWritten(true);
+
+ closeSeqResponseRMMsg.addSOAPEnvelope();
+
+ AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
+
+ try {
+ engine.send(closeSequenceResponseMsg);
+ } catch (AxisFault e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendCloseResponse,
+ sequenceId, e.toString());
+ throw new SandeshaException(message, e);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CloseSequenceProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: CloseSequenceProcessor::processOutMessage");
+
+ // Get the data from the message context
+ setupOutMessage(rmMsgCtx);
+
+ //write into the sequence proeprties that the client is now closed
+ getRMSBean().setSequenceClosedClient(true);
+ getStorageManager().getRMSBeanMgr().update(getRMSBean());
+
+ AxisOperation closeOperation = SpecSpecificConstants.getWSRMOperation(
+ Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE,
+ rmMsgCtx.getRMSpecVersion(),
+ rmMsgCtx.getMessageContext().getAxisService());
+ getMsgContext().setAxisOperation(closeOperation);
+
+
+ OperationContext opcontext = ContextFactory.createOperationContext(closeOperation, getMsgContext().getServiceContext());
+ opcontext.setParent(getMsgContext().getServiceContext());
+
+ getConfigurationContext().registerOperationContext(rmMsgCtx.getMessageId(),opcontext);
+ getMsgContext().setOperationContext(opcontext);
+
+ CloseSequence closeSequencePart = (CloseSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+ Identifier identifier = closeSequencePart.getIdentifier();
+ if (identifier==null) {
+ identifier = new Identifier (closeSequencePart.getNamespaceValue());
+ closeSequencePart.setIdentifier(identifier);
+ }
+
+ rmMsgCtx.setWSAAction(SpecSpecificConstants.getCloseSequenceAction(getRMVersion()));
+ rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (getRMVersion()));
+
+ // Send this outgoing message
+ sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
+
+ // Pause the message context
+ rmMsgCtx.pause();
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CloseSeqMsgProcessor::processOutMessage " + Boolean.TRUE);
+
+ return true;
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org