You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ch...@apache.org on 2006/10/27 00:14:37 UTC
svn commit: r468170 - in /webservices/sandesha/trunk/java: ./
src/org/apache/sandesha2/handlers/ src/org/apache/sandesha2/msgprocessors/
src/org/apache/sandesha2/transport/
Author: chamikara
Date: Thu Oct 26 15:14:35 2006
New Revision: 468170
URL: http://svn.apache.org/viewvc?view=rev&rev=468170
Log:
Applied the patch from Matt on Sandesha2-32.
Modified:
webservices/sandesha/trunk/java/project.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
Modified: webservices/sandesha/trunk/java/project.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/project.properties?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/project.properties (original)
+++ webservices/sandesha/trunk/java/project.properties Thu Oct 26 15:14:35 2006
@@ -10,7 +10,7 @@
maven.repo.remote=http://www.ibiblio.org/maven/,\
http://people.apache.org/repository/,\
http://dist.codehaus.org/,\
- http://ws.zones.apache.org/~dims/maven/
+ http://ws.zones.apache.org/repository/
# XDOC PLUGIN
###############
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Thu Oct 26 15:14:35 2006
@@ -31,15 +31,12 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
@@ -62,12 +59,14 @@
private static final long serialVersionUID = -7187928423123306156L;
private static final Log log = LogFactory.getLog(SandeshaGlobalInHandler.class.getName());
-
- public void invoke(MessageContext msgContext) throws AxisFault {
+
+ public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: SandeshaGlobalInHandler::invoke, " + msgContext.getEnvelope().getHeader());
+ InvocationResponse returnValue = InvocationResponse.CONTINUE;
+
ConfigurationContext configContext = msgContext.getConfigurationContext();
if (configContext == null)
throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet));
@@ -78,21 +77,21 @@
String reinjectedMessage = (String) msgContext.getProperty(Sandesha2Constants.REINJECTED_MESSAGE);
if (reinjectedMessage != null && Sandesha2Constants.VALUE_TRUE.equals(reinjectedMessage))
- return; // Reinjected messages are not processed by Sandesha2 inflow
- // handlers
+ return returnValue; // Reinjected messages are not processed by Sandesha2 inflow
+ // handlers
StorageManager storageManager = null;
try {
storageManager = SandeshaUtil
.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration());
if (storageManager == null) {
- log.debug("Sandesha2 cannot proceed. The StorageManager is not available");
- return;
+ log.debug("Sandesha2 cannot proceed. The StorageManager is not available " + returnValue);
+ return returnValue;
}
} catch (SandeshaException e1) {
// TODO make this a log
log.debug("Sandesha2 cannot proceed. Exception thrown when looking for the StorageManager", e1);
- return;
+ return returnValue;
}
boolean withinTransaction = false;
@@ -138,6 +137,7 @@
faultManager.manageIncomingRMFault (axisFault, msgContext);
msgContext.pause();
+ returnValue = InvocationResponse.SUSPEND;
}
}
@@ -147,8 +147,8 @@
boolean isRMGlobalMessage = SandeshaUtil.isRMGlobalMessage(msgContext);
if (!isRMGlobalMessage) {
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaGlobalInHandler::invoke, !isRMGlobalMessage");
- return;
+ log.debug("Exit: SandeshaGlobalInHandler::invoke, !isRMGlobalMessage " + returnValue);
+ return returnValue;
}
RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
@@ -156,10 +156,11 @@
// Dropping duplicates
boolean dropped = dropIfDuplicate(rmMessageContext, storageManager);
if (dropped) {
+ returnValue = InvocationResponse.SUSPEND; //the msg has been paused
processDroppedMessage(rmMessageContext, storageManager);
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaGlobalInHandler::invoke, dropped");
- return;
+ log.debug("Exit: SandeshaGlobalInHandler::invoke, dropped " + returnValue);
+ return returnValue;
}
// Persisting the application messages
@@ -177,6 +178,7 @@
} catch (Exception e) {
// message should not be sent in a exception situation.
msgContext.pause();
+ returnValue = InvocationResponse.SUSPEND;
if (!withinTransaction) {
try {
@@ -205,7 +207,8 @@
}
}
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaGlobalInHandler::invoke");
+ log.debug("Exit: SandeshaGlobalInHandler::invoke " + returnValue);
+ return returnValue;
}
private boolean dropIfDuplicate(RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Thu Oct 26 15:14:35 2006
@@ -53,12 +53,14 @@
public String getName() {
return Sandesha2Constants.IN_HANDLER_NAME;
}
-
- public void invoke(MessageContext msgCtx) throws AxisFault {
+
+ public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: SandeshaInHandler::invoke, " + msgCtx.getEnvelope().getHeader());
+ InvocationResponse returnValue = InvocationResponse.CONTINUE;
+
ConfigurationContext context = msgCtx.getConfigurationContext();
if (context == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
@@ -69,15 +71,15 @@
String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE)) {
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaInHandler::invoke, Application processing done");
- return;
+ log.debug("Exit: SandeshaInHandler::invoke, Application processing done " + returnValue);
+ return returnValue;
}
String reinjectedMessage = (String) msgCtx.getProperty(Sandesha2Constants.REINJECTED_MESSAGE);
if (reinjectedMessage != null && Sandesha2Constants.VALUE_TRUE.equals(reinjectedMessage)) {
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaInHandler::invoke, reinjectedMessage");
- return; // Reinjected messages are not processed by Sandesha2 inflow
+ log.debug("Exit: SandeshaInHandler::invoke, reinjectedMessage " + returnValue);
+ return returnValue; // Reinjected messages are not processed by Sandesha2 inflow
// handlers
}
@@ -100,11 +102,15 @@
// Process Ack headers in the message
AcknowledgementProcessor ackProcessor = new AcknowledgementProcessor();
- ackProcessor.processAckHeaders(msgCtx);
+ if(ackProcessor.processAckHeaders(msgCtx)){
+ returnValue = InvocationResponse.SUSPEND;
+ }
// Process Ack Request headers in the message
AckRequestedProcessor reqProcessor = new AckRequestedProcessor();
- reqProcessor.processAckRequestedHeaders(msgCtx);
+ if(reqProcessor.processAckRequestedHeaders(msgCtx)){
+ returnValue = InvocationResponse.SUSPEND;
+ }
AxisService axisService = msgCtx.getAxisService();
if (axisService == null) {
@@ -125,6 +131,7 @@
//Ack messages will be paused
if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK) {
rmMsgCtx.pause();
+ returnValue = InvocationResponse.SUSPEND;
}
// validating the message
@@ -132,14 +139,20 @@
MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
- if (msgProcessor != null)
- msgProcessor.processInMessage(rmMsgCtx);
+ if (msgProcessor != null){
+ if(msgProcessor.processInMessage(rmMsgCtx)){
+ //this paused the msg
+ returnValue = InvocationResponse.SUSPEND;
+ }
+ }
+
} catch (AxisFault e) {
// message should not be sent in a exception situation.
msgCtx.pause();
-
+ returnValue = InvocationResponse.SUSPEND;
+
if (!withinTransaction) {
try {
transaction.rollback();
@@ -164,7 +177,8 @@
}
}
if (log.isDebugEnabled())
- log.debug("Exit: SandeshaInHandler::invoke");
+ log.debug("Exit: SandeshaInHandler::invoke " + returnValue);
+ return returnValue;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Thu Oct 26 15:14:35 2006
@@ -1,188 +1,197 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.handlers;
-
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContextFactory;
-import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.Parameter;
-import org.apache.axis2.handlers.AbstractHandler;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.client.SandeshaClientConstants;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
-import org.apache.sandesha2.msgprocessors.MsgProcessor;
-import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.util.MsgInitializer;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.wsrm.Sequence;
-
-/**
- * This is invoked in the outFlow of an RM endpoint
- */
-
-public class SandeshaOutHandler extends AbstractHandler {
-
- private static final long serialVersionUID = 8261092322051924103L;
-
- private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
-
- public void invoke(MessageContext msgCtx) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
-
- ConfigurationContext context = msgCtx.getConfigurationContext();
- if (context == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
- log.debug(message);
- throw new AxisFault(message);
- }
-
- AxisService axisService = msgCtx.getAxisService();
- if (axisService == null) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
- log.debug(message);
- throw new AxisFault(message);
- }
-
- //see if this message is unreliable i.e. WSRM not requried
- //look at the msg ctx first
- {
- String unreliable = (String) msgCtx.getProperty(SandeshaClientConstants.UNRELIABLE_MESSAGE);
- if (null != unreliable && "true".equals(unreliable)) {
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message");
- return;
- }
- }
- //look at the operation ctx
- {
- Parameter unreliable = msgCtx.getAxisOperation().getParameter(SandeshaClientConstants.UNRELIABLE_MESSAGE);
- if (null != unreliable && "true".equals(unreliable.getValue())) {
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message");
- return;
- }
- }
- // Also do not apply RM to fault messages
- {
- if(msgCtx.isProcessingFault()) {
- if(log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message");
- return;
- }
- }
-
- //this will change the execution chain of this message to work correctly in retransmissions.
- //For e.g. Phases like security will be removed to be called in each retransmission.
- SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
-
- String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
- if (null != DONE && "true".equals(DONE)) {
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::invoke, Application processing done");
- return;
- }
-
- msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
-
- boolean withinTransaction = false;
- String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
- if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
- withinTransaction = true;
- }
-
- Transaction transaction = null;
- if (!withinTransaction) {
- transaction = storageManager.getTransaction();
- msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
- }
- boolean rolebacked = false;
-
- try {
- // getting rm message
- RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
-
- MsgProcessor msgProcessor = null;
- int messageType = rmMsgCtx.getMessageType();
- if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
- MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
- OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
-
- if (msgCtx.isServerSide()) { // for the server side
- RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
- Sequence sequencePart = (Sequence) reqRMMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- if (sequencePart != null)
- msgProcessor = new ApplicationMsgProcessor();// a rm intended message
- } else // if client side.
- msgProcessor = new ApplicationMsgProcessor();
-
- } else {
- msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
- }
-
- if (msgProcessor != null)
- msgProcessor.processOutMessage(rmMsgCtx);
-
- } catch (Exception e) {
- // message should not be sent in a exception situation.
- msgCtx.pause();
-
- // rolling back the transaction
- if (!withinTransaction) {
- try {
- transaction.rollback();
- msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
- rolebacked = true;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
- log.debug(message, e);
- }
- }
-
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
- throw new AxisFault(message, e);
- } finally {
- if (!withinTransaction && !rolebacked) {
- try {
- transaction.commit();
- msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
- } catch (Exception e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
- log.debug(message, e);
- }
- }
- }
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::invoke");
- }
-
- public String getName() {
- return Sandesha2Constants.OUT_HANDLER_NAME;
- }
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.handlers;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContextFactory;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.handlers.AbstractHandler;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.msgprocessors.ApplicationMsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessor;
+import org.apache.sandesha2.msgprocessors.MsgProcessorFactory;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.util.MsgInitializer;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.Sequence;
+
+/**
+ * This is invoked in the outFlow of an RM endpoint
+ */
+
+public class SandeshaOutHandler extends AbstractHandler {
+
+ private static final long serialVersionUID = 8261092322051924103L;
+
+ private static final Log log = LogFactory.getLog(SandeshaOutHandler.class.getName());
+
+ public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
+
+ InvocationResponse returnValue = InvocationResponse.CONTINUE;
+
+ ConfigurationContext context = msgCtx.getConfigurationContext();
+ if (context == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ AxisService axisService = msgCtx.getAxisService();
+ if (axisService == null) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
+ log.debug(message);
+ throw new AxisFault(message);
+ }
+
+ //see if this message is unreliable i.e. WSRM not requried
+ //look at the msg ctx first
+ {
+ String unreliable = (String) msgCtx.getProperty(SandeshaClientConstants.UNRELIABLE_MESSAGE);
+ if (null != unreliable && "true".equals(unreliable)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
+ return returnValue;
+ }
+ }
+ //look at the operation ctx
+ {
+ Parameter unreliable = msgCtx.getAxisOperation().getParameter(SandeshaClientConstants.UNRELIABLE_MESSAGE);
+ if (null != unreliable && "true".equals(unreliable.getValue())) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
+ return returnValue;
+ }
+ }
+ // Also do not apply RM to fault messages
+ {
+ if(msgCtx.isProcessingFault()) {
+ if(log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message " + returnValue);
+ return returnValue ;
+ }
+ }
+
+ //this will change the execution chain of this message to work correctly in retransmissions.
+ //For e.g. Phases like security will be removed to be called in each retransmission.
+ SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+
+ String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
+ if (null != DONE && "true".equals(DONE)) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke, Application processing done " + returnValue);
+ return returnValue;
+ }
+
+ msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+
+ boolean withinTransaction = false;
+ String withinTransactionStr = (String) msgCtx.getProperty(Sandesha2Constants.WITHIN_TRANSACTION);
+ if (withinTransactionStr != null && Sandesha2Constants.VALUE_TRUE.equals(withinTransactionStr)) {
+ withinTransaction = true;
+ }
+
+ Transaction transaction = null;
+ if (!withinTransaction) {
+ transaction = storageManager.getTransaction();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+ }
+ boolean rolebacked = false;
+
+ try {
+ // getting rm message
+ RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
+
+ MsgProcessor msgProcessor = null;
+ int messageType = rmMsgCtx.getMessageType();
+ if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
+ MessageContext requestMsgCtx = msgCtx.getOperationContext().getMessageContext(
+ OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+
+ if (msgCtx.isServerSide()) { // for the server side
+ RMMsgContext reqRMMsgCtx = MsgInitializer.initializeMessage(requestMsgCtx);
+ Sequence sequencePart = (Sequence) reqRMMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if (sequencePart != null)
+ msgProcessor = new ApplicationMsgProcessor();// a rm intended message
+ } else // if client side.
+ msgProcessor = new ApplicationMsgProcessor();
+
+ } else {
+ msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
+ }
+
+ if (msgProcessor != null){
+ if(msgProcessor.processOutMessage(rmMsgCtx)){
+ //the msg was paused
+ returnValue = InvocationResponse.SUSPEND;
+ }
+ }
+
+
+ } catch (Exception e) {
+ // message should not be sent in a exception situation.
+ msgCtx.pause();
+ returnValue = InvocationResponse.SUSPEND;
+
+ // rolling back the transaction
+ if (!withinTransaction) {
+ try {
+ transaction.rollback();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ rolebacked = true;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
+ log.debug(message, e);
+ }
+ }
+
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
+ throw new AxisFault(message, e);
+ } finally {
+ if (!withinTransaction && !rolebacked) {
+ try {
+ transaction.commit();
+ msgCtx.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_FALSE);
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
+ }
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
+ return returnValue;
+ }
+
+ public String getName() {
+ return Sandesha2Constants.OUT_HANDLER_NAME;
+ }
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Thu Oct 26 15:14:35 2006
@@ -53,7 +53,6 @@
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SOAPAbstractFactory;
@@ -69,13 +68,13 @@
private static final Log log = LogFactory.getLog(AckRequestedProcessor.class);
- public void processAckRequestedHeaders(MessageContext message) throws AxisFault {
+ public boolean processAckRequestedHeaders(MessageContext message) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AckRequestedProcessor::processAckRequestHeaders");
SOAPEnvelope envelope = message.getEnvelope();
SOAPHeader header = envelope.getHeader();
-
+ boolean msgCtxPaused = false;
if(header!=null)
{
for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
@@ -86,16 +85,29 @@
OMElement ack = (OMElement) acks.next();
AckRequested ackReq = new AckRequested(headerName.getNamespaceURI());
ackReq.fromOMElement(ack);
- processAckRequestedHeader(message, ack, ackReq);
+ boolean paused = processAckRequestedHeader(message, ack, ackReq);
+ //if nto already paused we might be now
+ if(!msgCtxPaused){
+ msgCtxPaused = paused;
+ }
}
}
}
if (log.isDebugEnabled())
- log.debug("Exit: AckRequestedProcessor::processAckRequestHeaders");
+ log.debug("Exit: AckRequestedProcessor::processAckRequestHeaders " + msgCtxPaused);
+ return msgCtxPaused;
}
- public void processAckRequestedHeader(MessageContext msgContext, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
+ /**
+ *
+ * @param msgContext
+ * @param soapHeader
+ * @param ackRequested
+ * @return true if the msg context was paused
+ * @throws AxisFault
+ */
+ public boolean processAckRequestedHeader(MessageContext msgContext, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
@@ -295,8 +307,10 @@
msgContext.pause();
if (log.isDebugEnabled())
- log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader");
+ log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader " + Boolean.TRUE);
+ return true;
}
+ return false;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Thu Oct 26 15:14:35 2006
@@ -29,7 +29,6 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.engine.AxisEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -40,10 +39,8 @@
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.CreateSeqBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.CreateSeqBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.util.AcknowledgementManager;
@@ -65,13 +62,18 @@
private static final Log log = LogFactory.getLog(AcknowledgementProcessor.class);
- public void processAckHeaders(MessageContext message) throws AxisFault {
+ /**
+ * @param message
+ * @return true if the msg context was paused
+ * @throws AxisFault
+ */
+ public boolean processAckHeaders(MessageContext message) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AcknowledgementProcessor::processAckHeaders");
SOAPEnvelope envelope = message.getEnvelope();
SOAPHeader header = envelope.getHeader();
-
+ boolean returnValue = false;
if(header!=null)
{
for(int i = 0; i < Sandesha2Constants.SPEC_NS_URIS.length; i++) {
@@ -82,21 +84,35 @@
OMElement ack = (OMElement) acks.next();
SequenceAcknowledgement seqAck = new SequenceAcknowledgement(headerName.getNamespaceURI());
seqAck.fromOMElement(ack);
- processAckHeader(message, ack, seqAck);
+ boolean ackPaused = processAckHeader(message, ack, seqAck);
+ //if not already paused we might be now
+ if(!returnValue){
+ returnValue = ackPaused;
+ }
}
}
}
if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementProcessor::processAckHeaders");
+ log.debug("Exit: AcknowledgementProcessor::processAckHeaders " + returnValue);
+ return returnValue;
}
- private void processAckHeader(MessageContext msgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
+ /**
+ * @param msgCtx
+ * @param soapHeader
+ * @param sequenceAck
+ * @return true if the msg context was paused
+ * @throws AxisFault
+ */
+ private boolean processAckHeader(MessageContext msgCtx, OMElement soapHeader, SequenceAcknowledgement sequenceAck)
throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AcknowledgementProcessor::processAckHeader " + soapHeader);
+ boolean returnValue = false;
+
// TODO: Note that this RMMessageContext is not really any use - but we need to create it
// so that it can be passed to the fault handling chain. It's really no more than a
// container for the correct addressing and RM spec levels, so we'd be better off passing
@@ -247,11 +263,13 @@
String action = msgCtx.getOptions().getAction();
if (action!=null && action.equals(SpecSpecificConstants.getAckRequestAction(rmMsgCtx.getRMSpecVersion()))) {
+ returnValue = true;
rmMsgCtx.pause();
}
if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementProcessor::processAckHeader");
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader " + returnValue);
+ return returnValue;
}
private SenderBean getRetransmitterEntry(Collection collection, long msgNo) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -86,10 +86,13 @@
private static final Log log = LogFactory.getLog(ApplicationMsgProcessor.class);
- public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processInMessage");
+ boolean msgCtxPaused = false;
+
// Processing the application message.
MessageContext msgCtx = rmMsgCtx.getMessageContext();
if (msgCtx == null) {
@@ -100,7 +103,7 @@
if (rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE) != null
&& rmMsgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE).equals("true")) {
- return;
+ return msgCtxPaused;
}
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(msgCtx.getConfigurationContext(),msgCtx.getConfigurationContext().getAxisConfiguration());
@@ -232,6 +235,7 @@
// this is a duplicate message and the invocation type is
// EXACTLY_ONCE.
rmMsgCtx.pause();
+ msgCtxPaused = true;
}
if (!msgNoPresentInList)
@@ -309,6 +313,7 @@
// pause the message
rmMsgCtx.pause();
+ msgCtxPaused = true;
// Starting the invoker if stopped.
SandeshaUtil.startInvokerForTheSequence(msgCtx.getConfigurationContext(), sequenceId);
@@ -319,7 +324,8 @@
sendAckIfNeeded(rmMsgCtx, messagesStr, storageManager);
if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::processInMessage");
+ log.debug("Exit: ApplicationMsgProcessor::processInMessage " + msgCtxPaused);
+ return msgCtxPaused;
}
// TODO convert following from INT to LONG
@@ -461,7 +467,7 @@
log.debug("Exit: ApplicationMsgProcessor::sendAckIfNeeded");
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
@@ -866,7 +872,8 @@
msgContext.pause();
if (log.isDebugEnabled())
- log.debug("Exit: ApplicationMsgProcessor::processOutMessage");
+ log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
+ return true;
}
private void addCreateSequenceMessage(RMMsgContext applicationRMMsg, String sequencePropertyKey, String internalSequenceId, EndpointReference acksTo,
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Thu Oct 26 15:14:35 2006
@@ -1,168 +1,170 @@
-/*
- * Copyright 1999-2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.sandesha2.msgprocessors;
-
-import java.util.Iterator;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.util.Utils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sandesha2.RMMsgContext;
-import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.i18n.SandeshaMessageHelper;
-import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.security.SecurityManager;
-import org.apache.sandesha2.security.SecurityToken;
-import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
-import org.apache.sandesha2.storage.beans.SequencePropertyBean;
-import org.apache.sandesha2.util.AcknowledgementManager;
-import org.apache.sandesha2.util.FaultManager;
-import org.apache.sandesha2.util.RMMsgCreator;
-import org.apache.sandesha2.util.SOAPAbstractFactory;
-import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.wsrm.CloseSequence;
-import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
-
-/**
- * Responsible for processing an incoming Close Sequence message. (As introduced
- * by the WSRM 1.1 specification)
- */
-
-public class CloseSequenceProcessor implements MsgProcessor {
-
- private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
-
- public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
- if (log.isDebugEnabled())
- log.debug("Enter: CloseSequenceProcessor::processInMessage");
-
- ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
- CloseSequence closeSequence = (CloseSequence) rmMsgCtx
- .getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
-
- MessageContext msgCtx = rmMsgCtx.getMessageContext();
-
- String sequenceId = closeSequence.getIdentifier().getIdentifier();
- String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
-
- StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
- .getAxisConfiguration());
- SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
-
- // Check that the sender of this CloseSequence holds the correct token
- SequencePropertyBean tokenBean = sequencePropMgr.retrieve(sequenceId, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
- if(tokenBean != null) {
- SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
- OMElement body = msgCtx.getEnvelope().getBody();
- SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
- secManager.checkProofOfPossession(token, body, msgCtx);
- }
-
- FaultManager faultManager = new FaultManager();
- SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
- if (fault != null) {
- throw fault;
- }
-
- SequencePropertyBean sequenceClosedBean = new SequencePropertyBean();
- sequenceClosedBean.setSequencePropertyKey(sequencePropertyKey);
- sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED);
- sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
-
- sequencePropMgr.insert(sequenceClosedBean);
-
- RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
-
- MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
-
- String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
- ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
-
- SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
- .getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
-
- // Setting new envelope
- SOAPEnvelope envelope = factory.getDefaultEnvelope();
- try {
- ackMsgCtx.setEnvelope(envelope);
- } catch (AxisFault e3) {
- throw new SandeshaException(e3.getMessage());
- }
-
- // adding the ack part to the envelope.
- Iterator sequenceAckIter = ackRMMsgCtx
- .getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
-
- MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
-
- MessageContext closeSequenceResponseMsg = null;
-
- try {
- closeSequenceResponseMsg = Utils.createOutMessageContext(closeSequenceMsg);
- } catch (AxisFault e1) {
- throw new SandeshaException(e1);
- }
-
- RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg,
- storageManager);
-
- while (sequenceAckIter.hasNext()) {
- SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next();
- closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
- sequenceAcknowledgement);
- }
-
- closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
- closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-
- closeSequenceResponseMsg.setResponseWritten(true);
-
- closeSeqResponseRMMsg.addSOAPEnvelope();
-
- AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
-
- try {
- engine.send(closeSequenceResponseMsg);
- } catch (AxisFault e) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse,
- sequenceId, e.toString());
- throw new SandeshaException(message, e);
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: CloseSequenceProcessor::processInMessage");
- }
-
- public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
- if (log.isDebugEnabled()) {
- log.debug("Enter: CloseSequenceProcessor::processOutMessage");
- log.debug("Exit: CloseSequenceProcessor::processOutMessage");
- }
-
- }
-
-}
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.sandesha2.msgprocessors;
+
+import java.util.Iterator;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.util.Utils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.security.SecurityManager;
+import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.storage.beans.SequencePropertyBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.FaultManager;
+import org.apache.sandesha2.util.RMMsgCreator;
+import org.apache.sandesha2.util.SOAPAbstractFactory;
+import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.wsrm.CloseSequence;
+import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
+
+/**
+ * Responsible for processing an incoming Close Sequence message. (As introduced
+ * by the WSRM 1.1 specification)
+ */
+
+public class CloseSequenceProcessor implements MsgProcessor {
+
+ private static final Log log = LogFactory.getLog(CloseSequenceProcessor.class);
+
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ if (log.isDebugEnabled())
+ log.debug("Enter: CloseSequenceProcessor::processInMessage");
+
+ ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
+ CloseSequence closeSequence = (CloseSequence) rmMsgCtx
+ .getMessagePart(Sandesha2Constants.MessageParts.CLOSE_SEQUENCE);
+
+ MessageContext msgCtx = rmMsgCtx.getMessageContext();
+
+ String sequenceId = closeSequence.getIdentifier().getIdentifier();
+ String sequencePropertyKey = SandeshaUtil.getSequencePropertyKey(rmMsgCtx);
+
+ StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configCtx, configCtx
+ .getAxisConfiguration());
+ SequencePropertyBeanMgr sequencePropMgr = storageManager.getSequencePropertyBeanMgr();
+
+ // Check that the sender of this CloseSequence holds the correct token
+ SequencePropertyBean tokenBean = sequencePropMgr.retrieve(sequenceId, Sandesha2Constants.SequenceProperties.SECURITY_TOKEN);
+ if(tokenBean != null) {
+ SecurityManager secManager = SandeshaUtil.getSecurityManager(msgCtx.getConfigurationContext());
+ OMElement body = msgCtx.getEnvelope().getBody();
+ SecurityToken token = secManager.recoverSecurityToken(tokenBean.getValue());
+ secManager.checkProofOfPossession(token, body, msgCtx);
+ }
+
+ FaultManager faultManager = new FaultManager();
+ SandeshaException fault = faultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager);
+ if (fault != null) {
+ throw fault;
+ }
+
+ SequencePropertyBean sequenceClosedBean = new SequencePropertyBean();
+ sequenceClosedBean.setSequencePropertyKey(sequencePropertyKey);
+ sequenceClosedBean.setName(Sandesha2Constants.SequenceProperties.SEQUENCE_CLOSED);
+ sequenceClosedBean.setValue(Sandesha2Constants.VALUE_TRUE);
+
+ sequencePropMgr.insert(sequenceClosedBean);
+
+ RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
+
+ MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
+
+ String rmNamespaceValue = rmMsgCtx.getRMNamespaceValue();
+ ackRMMsgCtx.setRMNamespaceValue(rmNamespaceValue);
+
+ SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SandeshaUtil
+ .getSOAPVersion(rmMsgCtx.getSOAPEnvelope()));
+
+ // Setting new envelope
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+ try {
+ ackMsgCtx.setEnvelope(envelope);
+ } catch (AxisFault e3) {
+ throw new SandeshaException(e3.getMessage());
+ }
+
+ // adding the ack part to the envelope.
+ Iterator sequenceAckIter = ackRMMsgCtx
+ .getMessageParts(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT);
+
+ MessageContext closeSequenceMsg = rmMsgCtx.getMessageContext();
+
+ MessageContext closeSequenceResponseMsg = null;
+
+ try {
+ closeSequenceResponseMsg = Utils.createOutMessageContext(closeSequenceMsg);
+ } catch (AxisFault e1) {
+ throw new SandeshaException(e1);
+ }
+
+ RMMsgContext closeSeqResponseRMMsg = RMMsgCreator.createCloseSeqResponseMsg(rmMsgCtx, closeSequenceResponseMsg,
+ storageManager);
+
+ while (sequenceAckIter.hasNext()) {
+ SequenceAcknowledgement sequenceAcknowledgement = (SequenceAcknowledgement) sequenceAckIter.next();
+ closeSeqResponseRMMsg.setMessagePart(Sandesha2Constants.MessageParts.SEQ_ACKNOWLEDGEMENT,
+ sequenceAcknowledgement);
+ }
+
+ closeSeqResponseRMMsg.setFlow(MessageContext.OUT_FLOW);
+ closeSeqResponseRMMsg.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+
+ closeSequenceResponseMsg.setResponseWritten(true);
+
+ closeSeqResponseRMMsg.addSOAPEnvelope();
+
+ AxisEngine engine = new AxisEngine(closeSequenceMsg.getConfigurationContext());
+
+ try {
+ engine.send(closeSequenceResponseMsg);
+ } catch (AxisFault e) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse,
+ sequenceId, e.toString());
+ throw new SandeshaException(message, e);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: CloseSequenceProcessor::processInMessage " + Boolean.FALSE);
+ return false;
+ }
+
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: CloseSequenceProcessor::processOutMessage");
+ log.debug("Exit: CloseSequenceProcessor::processOutMessage " + Boolean.FALSE);
+ }
+ return false;
+
+ }
+
+}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -66,7 +66,7 @@
private static final Log log = LogFactory.getLog(CreateSeqMsgProcessor.class);
- public void processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
+ public boolean processInMessage(RMMsgContext createSeqRMMsg) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: CreateSeqMsgProcessor::processInMessage");
@@ -264,7 +264,8 @@
createSeqRMMsg.pause();
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::processInMessage");
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
}
private boolean offerAccepted(String sequenceId, ConfigurationContext configCtx, RMMsgContext createSeqRMMsg,
@@ -300,7 +301,7 @@
return true;
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter: CreateSeqMsgProcessor::processOutMessage");
@@ -316,7 +317,8 @@
}
}
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::processOutMessage");
+ log.debug("Exit: CreateSeqMsgProcessor::processOutMessage " + Boolean.FALSE);
+ return false;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -61,7 +61,7 @@
private static final Log log = LogFactory.getLog(CreateSeqResponseMsgProcessor.class);
- public void processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
+ public boolean processInMessage(RMMsgContext createSeqResponseRMMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: CreateSeqResponseMsgProcessor::processInMessage");
@@ -307,14 +307,16 @@
createSeqResponseRMMsgCtx.pause();
if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage");
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
if (log.isDebugEnabled()) {
log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
- log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage");
+ log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
}
+ return false;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Thu Oct 26 15:14:35 2006
@@ -38,7 +38,7 @@
* A message is selected by the set of SenderBeans that are waiting to be sent.
* This is processed using a SenderWorker.
*/
- public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
Address address = makeConnection.getAddress();
@@ -97,7 +97,7 @@
}
if (senderBean==null)
- return;
+ return false;
TransportOutDescription transportOut = rmMsgCtx.getMessageContext().getTransportOut();
if (transportOut==null) {
@@ -128,6 +128,7 @@
worker.setTransportOut(rmMsgCtx.getMessageContext().getTransportOut());
worker.run();
+ return false;
}
private void addMessagePendingHeader (RMMsgContext returnMessage, boolean pending) throws SandeshaException {
@@ -139,7 +140,8 @@
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ return false;
}
private void setTransportProperties (MessageContext returnMessage, RMMsgContext makeConnectionMessage) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -25,6 +25,19 @@
*/
public interface MsgProcessor {
- public void processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+
+ /**
+ * @param rmMsgCtx
+ * @return true if the msg context has been paused
+ * @throws AxisFault
+ */
+ public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault;
+
+ /**
+ *
+ * @param rmMsgCtx
+ * @return true if the msg context has been paused
+ * @throws AxisFault
+ */
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -70,7 +70,7 @@
private static final Log log = LogFactory.getLog(TerminateSeqMsgProcessor.class);
- public void processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
+ public boolean processInMessage(RMMsgContext terminateSeqRMMsg) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
@@ -177,7 +177,8 @@
terminateSeqMsg.pause();
if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::processInMessage");
+ log.debug("Exit: TerminateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
}
private void setUpHighestMsgNumbers(ConfigurationContext configCtx, StorageManager storageManager,
@@ -327,7 +328,7 @@
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");
@@ -393,7 +394,7 @@
if (terminated != null && "true".equals(terminated)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.terminateAddedPreviously);
log.debug(message);
- return;
+ return false;
}
TerminateSequence terminateSequencePart = (TerminateSequence) rmMsgCtx
@@ -472,7 +473,8 @@
SandeshaUtil.executeAndStore(rmMsgCtx, key);
if (log.isDebugEnabled())
- log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage");
+ log.debug("Exit: TerminateSeqMsgProcessor::processOutMessage " + Boolean.FALSE);
+ return false;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Thu Oct 26 15:14:35 2006
@@ -42,7 +42,7 @@
private static final Log log = LogFactory.getLog(TerminateSeqResponseMsgProcessor.class);
- public void processInMessage(RMMsgContext terminateResRMMsg)
+ public boolean processInMessage(RMMsgContext terminateResRMMsg)
throws AxisFault {
if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processInMessage");
@@ -79,11 +79,13 @@
// Stop this message travelling further through the Axis runtime
terminateResRMMsg.pause();
- if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage");
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processInMessage " + Boolean.TRUE);
+ return true;
}
- public void processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
+ public boolean processOutMessage(RMMsgContext rmMsgCtx) throws SandeshaException {
if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
- if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage");
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
+ return false;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java?view=diff&rev=468170&r1=468169&r2=468170
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/transport/Sandesha2TransportSender.java Thu Oct 26 15:14:35 2006
@@ -46,7 +46,7 @@
}
- public void invoke(MessageContext msgContext) throws AxisFault {
+ public InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: Sandesha2TransportSender::invoke, " + msgContext.getEnvelope().getHeader());
@@ -77,7 +77,7 @@
if (log.isDebugEnabled())
log.debug("Exit: Sandesha2TransportSender::invoke");
-
+ return InvocationResponse.CONTINUE;
}
//Below methods are not used
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org