You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ml...@apache.org on 2007/02/02 20:29:58 UTC
svn commit: r502698 - in /webservices/sandesha/trunk/java: ./ config/
src/org/apache/sandesha2/ src/org/apache/sandesha2/handlers/
src/org/apache/sandesha2/i18n/ src/org/apache/sandesha2/msgprocessors/
src/org/apache/sandesha2/policy/ src/org/apache/sa...
Author: mlovett
Date: Fri Feb 2 11:29:56 2007
New Revision: 502698
URL: http://svn.apache.org/viewvc?view=rev&rev=502698
Log:
Add a config option to use message serialization, see SANDESHA2-70
Added:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java (with props)
Modified:
webservices/sandesha/trunk/java/config/module.xml
webservices/sandesha/trunk/java/maven.xml
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java
Modified: webservices/sandesha/trunk/java/config/module.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/module.xml?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/config/module.xml (original)
+++ webservices/sandesha/trunk/java/config/module.xml Fri Feb 2 11:29:56 2007
@@ -116,6 +116,7 @@
<sandesha2:UseRMAnonURI>true</sandesha2:UseRMAnonURI>
</sandesha2:MakeConnection>
+ <!-- <sandesha2:UseMessageSerialization>true</sandesha2:UseMessageSerialization> -->
</wsp:Policy>
</sandesha2:RMAssertion>
</wsp:Policy>
Modified: webservices/sandesha/trunk/java/maven.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/maven.xml?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/maven.xml (original)
+++ webservices/sandesha/trunk/java/maven.xml Fri Feb 2 11:29:56 2007
@@ -194,10 +194,41 @@
<delete dir="${build.temp.dir}"/>
</goal>
+ <goal name="serialize:create" prereqs="server:create,client:create">
+ <ant:property name="serialize.client.path" value="${build.repo.dir}/serialize-client"/>
+ <ant:property name="serialize.server.path" value="${build.repo.dir}/serialize-server"/>
+
+ <ant:property name="serialize.temp.path" value="${build.temp.dir}/serialize" />
+ <ant:mkdir dir="${serialize.temp.path}" />
+
+ <copy todir="${serialize.client.path}">
+ <fileset dir="${build.repo.dir}/client">
+ <exclude name="modules/${mar.name}"/>
+ </fileset>
+ </copy>
+
+ <copy todir="${serialize.server.path}">
+ <fileset dir="${build.repo.dir}/server">
+ <exclude name="modules/${mar.name}"/>
+ </fileset>
+ </copy>
+
+ <!-- Switch on serialization in the module.xml file -->
+ <ant:unjar src="${maven.build.dir}/${mar.name}" dest="${serialize.temp.path}"/>
+ <ant:replace file="${serialize.temp.path}/META-INF/module.xml"
+ token="<!-- <sandesha2:UseMessageSerialization>true</sandesha2:UseMessageSerialization> -->"
+ value="<sandesha2:UseMessageSerialization>true</sandesha2:UseMessageSerialization>"/>
+ <ant:jar jarfile="${serialize.client.path}/modules/${test.module.name}.mar" basedir="${serialize.temp.path}"/>
+ <ant:jar jarfile="${serialize.server.path}/modules/${test.module.name}.mar" basedir="${serialize.temp.path}"/>
+
+ <delete dir="${build.temp.dir}"/>
+ </goal>
+
<goal name="repo:create">
<attainGoal name="server:create"/>
<attainGoal name="client:create"/>
<attainGoal name="secure:create"/>
+ <attainGoal name="serialize:create"/>
</goal>
<goal name="server:create" prereqs="mar,sample:create">
@@ -304,7 +335,7 @@
<ant:copy file="${build.interop.dir}/${interop.service.aar.name}" toDir="${build.repo.dir}/server/services" overwrite="true"/>
<ant:copy file="${dir.interop}/conf/sec-services.xml" toFile="${dir.interop.service.temp}/META-INF/services.xml" overwrite="true"/>
- <ant:copy file="${dir.interop}/conf/SecRMInteropService.wsdl" toFile="${dir.interop.service.temp}/META-INF/RMInteropService.wsdl" overwrite="true"/>
+ <ant:copy file="${dir.interop}/conf/SecRMInteropService.wsdl" toFile="${dir.interop.service.temp}/META-INF/RMInteropService.wsdl" overwrite="true"/>
<ant:copy file="${dir.interop}/conf/store.jks" toFile="${dir.interop.service.temp}/store.jks" overwrite="true"/>
<ant:jar jarfile="${build.interop.dir}/${interop.sec.service.aar.name}" basedir="${dir.interop.service.temp}" overwrite="true"/>
<ant:copy file="${build.interop.dir}/${interop.sec.service.aar.name}" toDir="${build.repo.dir}/server/services" overwrite="true"/>
@@ -393,8 +424,8 @@
<ant:copy file="target/${jar.name}" todir="${dir.temp.dist.bin}" />
<ant:copy file="target/${mar.name}" todir="${dir.temp.dist.bin}" />
- <ant:copy file="target/${client.jar.name}" todir="${dir.temp.dist.bin}" />
- <ant:copy file="target/${policy.jar.name}" todir="${dir.temp.dist.bin}" />
+ <ant:copy file="target/${client.jar.name}" todir="${dir.temp.dist.bin}" />
+ <ant:copy file="target/${policy.jar.name}" todir="${dir.temp.dist.bin}" />
<!-- <ant:copy file="${dir.config}/sandesha2.properties" todir="${dir.temp.dist.bin}" /> -->
<ant:copy file="${apache.license.file}" todir="${dir.temp.dist.bin}" />
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Fri Feb 2 11:29:56 2007
@@ -355,6 +355,8 @@
String EnableRMAnonURI = "EnableRMAnonURI";
+ String UseMessageSerialization = "UseMessageSerialization";
+
public interface DefaultValues {
int RetransmissionInterval = 6000;
@@ -386,6 +388,8 @@
boolean EnableMakeConnection = true;
boolean EnableRMAnonURI = true;
+
+ boolean UseMessageSerialization = false;
}
}
@@ -528,6 +532,7 @@
public static final String ELEM_MAKE_CONNECTION = "MakeConnection";
public static final String ELEM_ENABLED = "Enabled";
public static final String ELEM_USE_RM_ANON_URI = "UseRMAnonURI";
+ public static final String ELEM_USE_SERIALIZATION = "UseMessageSerialization";
public static final QName Q_ELEM_POLICY = new QName(URI_POLICY_NS, ELEM_POLICY, ATTR_WSP);
public static final QName Q_ELEM_RMASSERTION = new QName(URI_RM_POLICY_NS, ELEM_RMASSERTION, ATTR_WSRM);
@@ -547,5 +552,6 @@
public static final QName Q_ELEM_MAKE_CONNECTION = new QName(URI_RM_POLICY_NS, ELEM_MAKE_CONNECTION, ATTR_WSRM);
public static final QName Q_ELEM_ENABLED = new QName(URI_RM_POLICY_NS, ELEM_ENABLED, ATTR_WSRM);
public static final QName Q_ELEM_USE_RM_ANON_URI = new QName(URI_RM_POLICY_NS, ELEM_USE_RM_ANON_URI, ATTR_WSRM);
+ public static final QName Q_ELEM_USE_SERIALIZATION = new QName(URI_RM_POLICY_NS, ELEM_USE_SERIALIZATION, ATTR_WSRM);
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Fri Feb 2 11:29:56 2007
@@ -20,7 +20,6 @@
import org.apache.axiom.soap.SOAPBody;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
@@ -33,17 +32,13 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.msgprocessors.MakeConnectionProcessor;
import org.apache.sandesha2.msgprocessors.SequenceProcessor;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
-import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
-import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.Range;
-import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.Sequence;
@@ -108,48 +103,10 @@
transaction = storageManager.getTransaction();
RMMsgContext rmMessageContext = MsgInitializer.initializeMessage(msgContext);
- EndpointReference replyTo = msgContext.getReplyTo();
- String specVersion = rmMessageContext.getRMSpecVersion();
- boolean duplicateMessage = isDuplicateMessage (rmMessageContext, storageManager);
-
- //checking weather the Message belongs to the WSRM 1.0 Anonymous InOut scenario.
- //If so instead of simply dropping duplicates we will have to attach the corresponding response,
- //as long as the Message has not been acked.
- if (duplicateMessage &&
- (replyTo==null || replyTo.hasAnonymousAddress()) &&
- (specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
-
- // Treat the duplicate message as a special kind of poll
- Sequence sequence = (Sequence) rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
-
- SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
- SenderBean findSenderBean = new SenderBean ();
- findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
- findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
- findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
- findSenderBean.setSend(true);
-
- SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
-
- // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
- // processor. This will use this thread to re-send the reply, writing it into the transport.
- // As the reply is now written we do not want to continue processing, or suspend, so we abort.
- if(replyMessageBean != null) {
- if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
- MakeConnectionProcessor.replyToPoll(rmMessageContext, replyMessageBean, storageManager, false, null);
- } else {
- if(log.isDebugEnabled()) log.debug("No matching reply for replayed message");
- }
-
- returnValue = InvocationResponse.ABORT;
- if (log.isDebugEnabled()) log.debug("Exit: SandeshaGlobalInHandler::invoke " + returnValue);
- return returnValue;
- }
-
boolean shouldMessageBeDropped = shouldMessageBeDropped (rmMessageContext, storageManager);
- if (duplicateMessage || shouldMessageBeDropped) {
+ if (shouldMessageBeDropped) {
returnValue = InvocationResponse.ABORT; // the msg has been
// dropped
@@ -167,11 +124,7 @@
}
} catch (Exception e) {
- if (log.isDebugEnabled())
- log.debug("Caught an exception", e);
- // message should not be sent in a exception situation.
- msgContext.pause();
- returnValue = InvocationResponse.SUSPEND;
+ if (log.isDebugEnabled()) log.debug("Caught an exception", e);
if (transaction != null) {
try {
@@ -203,31 +156,6 @@
log.debug("Exit: SandeshaGlobalInHandler::invoke " + returnValue);
return returnValue;
}
-
-
- private boolean isDuplicateMessage (RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault {
- boolean duplicate = false;
-
- if (rmMsgContext.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
-
- Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- long msgNo = sequence.getMessageNumber().getMessageNumber();
-
- RMDBean rmdBean =
- SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequence.getIdentifier().getIdentifier());
-
- if (rmdBean != null) {
- RangeString serverCompletedMessages = rmdBean.getServerCompletedMessages();
-
- if (serverCompletedMessages.isMessageNumberInRanges(msgNo))
- duplicate = true;
- }
-
- }
-
- return duplicate;
- }
-
private boolean shouldMessageBeDropped(RMMsgContext rmMsgContext, StorageManager storageManager) throws AxisFault {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Fri Feb 2 11:29:56 2007
@@ -254,6 +254,8 @@
public static final String maximumRetransmissionCountProcessor="maximumRetransmissionCountProcessor";
public static final String nullMsgId="nullMsgId";
public static final String storageMapNotPresent="storageMapNotPresent";
+ public static final String failedToStoreMessage="failedToStoreMessage";
+ public static final String failedToLoadMessage="failedToLoadMessage";
public static final String entryNotPresentForUpdating="entryNotPresentForUpdating";
public static final String appMsgIsNull="appMsgIsNull";
public static final String invalidMsgNumberList="invalidMsgNumberList";
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Fri Feb 2 11:29:56 2007
@@ -269,6 +269,8 @@
maximumRetransmissionCountProcessor=MaximumRetransmissionCountProcessor:doAcknowledgementInterval
nullMsgId=Key (MessageId) is null. Cannot insert.
storageMapNotPresent=Error: storage Map not present
+failedToStoreMessage=Failed to store message due to exception {0}.
+failedToLoadMessage=Failed to load message due to exception {0}.
entryNotPresentForUpdating=Entry is not present for updating
appMsgIsNull=Application message is null
invalidMsgNumberList=Invalid msg number list
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Fri Feb 2 11:29:56 2007
@@ -17,8 +17,8 @@
package org.apache.sandesha2.msgprocessors;
-import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import javax.xml.namespace.QName;
@@ -29,9 +29,7 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
-import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2004Constants;
import org.apache.axis2.wsdl.WSDLConstants.WSDL20_2006Constants;
import org.apache.commons.logging.Log;
@@ -43,6 +41,7 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
+import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -56,7 +55,6 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
-import org.apache.sandesha2.wsrm.Nack;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
/**
@@ -135,7 +133,6 @@
if(log.isDebugEnabled()) log.debug("Got Ack for RM Sequence: " + outSequenceId + ", internalSeqId: " + internalSequenceId);
Iterator ackRangeIterator = sequenceAck.getAcknowledgementRanges().iterator();
- Iterator nackIterator = sequenceAck.getNackList().iterator();
if (FaultManager.checkForUnknownSequence(rmMsgCtx, outSequenceId, storageManager)) {
if (log.isDebugEnabled())
@@ -149,106 +146,72 @@
return;
}
- SenderBean input = new SenderBean();
- input.setSend(true);
- input.setReSend(true);
- input.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
- input.setInternalSequenceID(internalSequenceId);
- Collection retransmitterEntriesOfSequence = retransmitterMgr.find(input);
-
String replyToAddress = rmsBean.getReplyToEPR();
EndpointReference replyTo = new EndpointReference (replyToAddress);
- boolean anonReplyTo = false;
- if (replyTo.hasAnonymousAddress())
- anonReplyTo = true;
+ boolean anonReplyTo = replyTo.hasAnonymousAddress();
String rmVersion = rmMsgCtx.getRMSpecVersion();
- boolean syncResponseExpected = false;
- RangeString ackedMessagesRanges = new RangeString(); //keep track of the ranges in the ack msgs
+ // Compare the clientCompletedMessages with the range we just got, to work out if there
+ // is any new information in this ack message
+ RangeString completedMessages = rmsBean.getClientCompletedMessages();
long numberOfNewMessagesAcked = 0;
- while (ackRangeIterator.hasNext()) {
+ while(ackRangeIterator.hasNext()) {
AcknowledgementRange ackRange = (AcknowledgementRange) ackRangeIterator.next();
long lower = ackRange.getLowerValue();
long upper = ackRange.getUpperValue();
- if(log.isDebugEnabled())
- log.debug("Ack Range: " + lower + " - " + upper);
-
- long rangeStart = lower;
- long messageNo;
- for (messageNo = lower; messageNo <= upper; messageNo++) {
- SenderBean retransmitterBean = getRetransmitterEntry(retransmitterEntriesOfSequence, messageNo);
-
- if (retransmitterBean != null) {
-
- // We've got an Ack for a message that hasn't been sent yet !
- if (retransmitterBean.getSentCount() == 0) {
- FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
- storageManager, SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackInvalidNotSent));
- if (log.isDebugEnabled())
- log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack");
- return;
- }
-
- String storageKey = retransmitterBean.getMessageContextRefKey();
-
- boolean syncResponseNeeded = false;
- if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
- MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
- AxisOperation operation = applicationMessage.getAxisOperation();
- boolean inOutMessage = false;
- if (operation!=null &&
- (WSDL20_2004Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern()) ||
- WSDL20_2006Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern())))
- inOutMessage = true;
+ // Quick check to see if the whole range is covered
+ if(!completedMessages.isRangeCompleted(new Range(lower, upper))) {
+ // We have new info, so take each message one at a time
+ for (long messageNo = lower; messageNo <= upper; messageNo++) {
+ if(!completedMessages.isMessageNumberInRanges(messageNo)) {
+ // We have a new message to consider
+ numberOfNewMessagesAcked++;
+ completedMessages.addRange(new Range(messageNo, messageNo));
+
+ SenderBean matcher = new SenderBean();
+ matcher.setSequenceID(outSequenceId);
+ matcher.setMessageNumber(messageNo);
+
+ SenderBean retransmitterBean = retransmitterMgr.findUnique(matcher);
+ if (retransmitterBean != null) {
+ // Check we haven't got an Ack for a message that hasn't been sent yet !
+ if (retransmitterBean.getSentCount() == 0) {
+ FaultManager.makeInvalidAcknowledgementFault(rmMsgCtx, sequenceAck, ackRange,
+ storageManager, SandeshaMessageHelper.getMessage(SandeshaMessageKeys.ackInvalidNotSent));
+ if (log.isDebugEnabled())
+ log.debug("Exit: AcknowledgementProcessor::processAckHeader, Invalid Ack");
+ return;
+ }
+
+ String storageKey = retransmitterBean.getMessageContextRefKey();
- if (inOutMessage) {
- OperationContext operationContext = applicationMessage.getOperationContext();
- if (operationContext!=null) {
- MessageContext responseMessage = operationContext.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- if (responseMessage==null) {
+ boolean syncResponseNeeded = false;
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && anonReplyTo) {
+ MessageContext applicationMessage = storageManager.retrieveMessageContext(storageKey, configCtx);
+ AxisOperation operation = applicationMessage.getAxisOperation();
+ if (operation!=null &&
+ (WSDL20_2004Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern()) ||
+ WSDL20_2006Constants.MEP_URI_OUT_IN.equals(operation.getMessageExchangePattern())))
syncResponseNeeded = true;
-
- //adding upto the current messageNo
- //current one will not be considered as we need to get an sync response from an repeat of this.
- if (rangeStart<messageNo) {
- ackedMessagesRanges.addRange(new Range(rangeStart, messageNo-1));
- rangeStart = messageNo+1;
- }
- }
}
- }
- }
- if (!syncResponseNeeded) {
- // removing the application message from the storage.
- retransmitterMgr.delete(retransmitterBean.getMessageID());
- storageManager.removeMessageContext(storageKey);
- numberOfNewMessagesAcked++;
- } else {
- //sync response is needed at least for one message, this should stop the termination.
- syncResponseExpected = true;
+ if (!syncResponseNeeded) {
+ // removing the application message from the storage.
+ retransmitterMgr.delete(retransmitterBean.getMessageID());
+ storageManager.removeMessageContext(storageKey);
+ }
+ }
}
}
}
-
- if (rangeStart<=upper)
- ackedMessagesRanges.addRange(new Range (rangeStart, upper));
-
}
// updating the last activated time of the sequence.
rmsBean.setLastActivatedTime(System.currentTimeMillis());
- while (nackIterator.hasNext()) {
- Nack nack = (Nack) nackIterator.next();
- long msgNo = nack.getNackNumber();
-
- // TODO - Process Nack
- }
-
//adding a MakeConnection for the response sequence if needed.
if (rmsBean.getOfferedSequence() != null) {
@@ -260,75 +223,25 @@
}
- // setting acked message date.
- // TODO add details specific to each message.
-
// We overwrite the previous client completed message ranges with the
// latest view, but only if it is an update i.e. contained a new
// ack range (which is because we do not previous acks arriving late
// to break us)
if (numberOfNewMessagesAcked>0) {
- rmsBean.setClientCompletedMessages(ackedMessagesRanges);
- long noOfMsgsAcked =
- rmsBean.getNumberOfMessagesAcked() + numberOfNewMessagesAcked;
+ rmsBean.setClientCompletedMessages(completedMessages);
+ long noOfMsgsAcked = rmsBean.getNumberOfMessagesAcked() + numberOfNewMessagesAcked;
rmsBean.setNumberOfMessagesAcked(noOfMsgsAcked);
}
- long lastOutMessage = rmsBean.getLastOutMessage ();
-
// Update the RMSBean
storageManager.getRMSBeanMgr().update(rmsBean);
- if (lastOutMessage > 0) {
- boolean complete = AcknowledgementManager.verifySequenceCompletion(sequenceAck
- .getAcknowledgementRanges().iterator(), lastOutMessage);
-
-
- //If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
- //received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
- boolean pauseTerminationForCS = false;
- if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals( rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
- pauseTerminationForCS = true;
-
- RMDBean findBean = new RMDBean ();
- findBean.setPollingMode(true);
-
- RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
- Iterator rmdBeanIteratort = rmdBeanMgr.find(findBean).iterator();
- while (rmdBeanIteratort.hasNext()) {
- RMDBean rmdBean = (RMDBean) rmdBeanIteratort.next();
- String toAddress = rmdBean.getToAddress();
- if (toAddress!=null && toAddress.equals(replyToAddress)) {
- pauseTerminationForCS = false;
- break;
- }
- }
-
- if (pauseTerminationForCS) {
- rmsBean.setTerminationPauserForCS(true);
- storageManager.getRMSBeanMgr().update(rmsBean);
- }
- }
-
-
- if (complete && !pauseTerminationForCS && !syncResponseExpected && !rmsBean.isTerminateAdded()) {
- TerminateManager.addTerminateSequenceMessage(rmMsgCtx, internalSequenceId, outSequenceId, storageManager);
- }
-
- }
+ // Try and terminate the sequence
+ TerminateManager.checkAndTerminate(rmMsgCtx, storageManager, rmsBean);
if (log.isDebugEnabled())
log.debug("Exit: AcknowledgementProcessor::processAckHeader");
}
- private SenderBean getRetransmitterEntry(Collection collection, long msgNo) {
- Iterator it = collection.iterator();
- while (it.hasNext()) {
- SenderBean bean = (SenderBean) it.next();
- if (bean.getMessageNumber() == msgNo)
- return bean;
- }
- return null;
- }
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Fri Feb 2 11:29:56 2007
@@ -225,28 +225,25 @@
RMSBean findBean = new RMSBean ();
findBean.setReplyToEPR(toAddress);
+ findBean.setTerminationPauserForCS(true);
//TODO recheck
RMSBean rmsBean = storageManager.getRMSBeanMgr().findUnique(findBean);
- if (rmsBean!=null) {
-
- if (rmsBean.isTerminationPauserForCS()) {
- //AckManager hs not done the termination. Do the termination here.
-
- MessageContext requestSideRefMessage = storageManager.retrieveMessageContext(rmsBean.getReferenceMessageStoreKey(),context);
- if (requestSideRefMessage==null) {
- FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg,
- SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getInternalSequenceID()),
- new Exception());
- // Return false if an Exception hasn't been thrown.
- if (log.isDebugEnabled())
- log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
- return false;
- }
-
- RMMsgContext requestSideRefRMMessage = MsgInitializer.initializeMessage(requestSideRefMessage);
- TerminateManager.addTerminateSequenceMessage(requestSideRefRMMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+ if (rmsBean!=null) {
+ //AckManager hs not done the termination. Do the termination here.
+ MessageContext requestSideRefMessage = storageManager.retrieveMessageContext(rmsBean.getReferenceMessageStoreKey(),context);
+ if (requestSideRefMessage==null) {
+ FaultManager.makeCreateSequenceRefusedFault(createSeqRMMsg,
+ SandeshaMessageHelper.getMessage(SandeshaMessageKeys.referencedMessageNotFound, rmsBean.getInternalSequenceID()),
+ new Exception());
+ // Return false if an Exception hasn't been thrown.
+ if (log.isDebugEnabled())
+ log.debug("Exit: CreateSeqMsgProcessor::processInMessage " + Boolean.FALSE);
+ return false;
}
+
+ RMMsgContext requestSideRefRMMessage = MsgInitializer.initializeMessage(requestSideRefMessage);
+ TerminateManager.addTerminateSequenceMessage(requestSideRefRMMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Fri Feb 2 11:29:56 2007
@@ -9,6 +9,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -117,10 +118,12 @@
String namespace)
throws AxisFault
{
+ if(log.isDebugEnabled()) log.debug("Entry: MakeConnectionProcessor::replyToPoll");
TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
if (transportOut==null) {
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
+ if(log.isDebugEnabled()) log.debug(message);
throw new SandeshaException (message);
}
@@ -128,6 +131,7 @@
MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,pollMessage.getConfigurationContext());
if (returnMessage==null) {
String message = "Cannot find the message stored with the key:" + messageStorageKey;
+ if(log.isDebugEnabled()) log.debug(message);
throw new SandeshaException (message);
}
@@ -138,6 +142,12 @@
// Link the response to the request
OperationContext context = pollMessage.getMessageContext().getOperationContext();
+ if(context == null) {
+ AxisOperation oldOperation = returnMessage.getAxisOperation();
+ context = new OperationContext(oldOperation);
+ context.addMessageContext(pollMessage.getMessageContext());
+ pollMessage.getMessageContext().setOperationContext(context);
+ }
context.addMessageContext(returnMessage);
returnMessage.setOperationContext(context);
@@ -145,12 +155,13 @@
//running the MakeConnection through a SenderWorker.
//This will allow Sandesha2 to consider both of following senarios equally.
- // 1. A message being sent by the Sender thread.
+ // 1. A message being sent by the Sender thread.
// 2. A message being sent as a reply to an MakeConnection.
SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage);
worker.setMessage(returnRMMsg);
-
worker.run();
+
+ if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
}
private static void addMessagePendingHeader (MessageContext returnMessage, String namespace) throws SandeshaException {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Fri Feb 2 11:29:56 2007
@@ -24,6 +24,7 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.engine.Handler.InvocationResponse;
@@ -41,13 +42,17 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.InvokerBean;
import org.apache.sandesha2.storage.beans.RMDBean;
+import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.FaultManager;
import org.apache.sandesha2.util.Range;
import org.apache.sandesha2.util.RangeString;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.wsrm.Sequence;
/**
@@ -164,8 +169,6 @@
}
EndpointReference replyTo = rmMsgCtx.getReplyTo();
- String mep = msgCtx.getAxisOperation().getMessageExchangePattern();
-
String key = SandeshaUtil.getUUID(); // key to store the message.
// updating the Highest_In_Msg_No property which gives the highest
// message number retrieved from this sequence.
@@ -191,10 +194,47 @@
boolean msgNoPresentInList =
serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
+ String specVersion = rmMsgCtx.getRMSpecVersion();
if (msgNoPresentInList
&& (Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE == Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
- // this is a duplicate message and the invocation type is
- // EXACTLY_ONCE.
+ // this is a duplicate message and the invocation type is EXACTLY_ONCE. We try to return
+ // ack messages at this point, as if someone is sending duplicates then they may have
+ // missed earlier acks. We also have special processing for sync 2-way with RM 1.0
+ if((replyTo==null || replyTo.hasAnonymousAddress()) &&
+ (specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0))) {
+
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+ SenderBean findSenderBean = new SenderBean ();
+ findSenderBean.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ findSenderBean.setInboundSequenceId(sequence.getIdentifier().getIdentifier());
+ findSenderBean.setInboundMessageNumber(sequence.getMessageNumber().getMessageNumber());
+ findSenderBean.setSend(true);
+
+ SenderBean replyMessageBean = senderBeanMgr.findUnique(findSenderBean);
+
+ // this is effectively a poll for the replyMessage, wo re-use the logic in the MakeConnection
+ // processor. This will use this thread to re-send the reply, writing it into the transport.
+ // As the reply is now written we do not want to continue processing, or suspend, so we abort.
+ if(replyMessageBean != null) {
+ if(log.isDebugEnabled()) log.debug("Found matching reply for replayed message");
+ MakeConnectionProcessor.replyToPoll(rmMsgCtx, replyMessageBean, storageManager, false, null);
+ result = InvocationResponse.ABORT;
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, replayed message: " + result);
+ return result;
+ }
+ }
+ EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
+ if (acksTo.hasAnonymousAddress()) {
+ RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx , sequenceId, storageManager,false,true);
+ msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, Constants.VALUE_TRUE);
+ AcknowledgementManager.sendAckNow(ackRMMsgContext);
+ result = InvocationResponse.ABORT;
+ if (log.isDebugEnabled())
+ log.debug("Exit: SequenceProcessor::processReliableMessage, acking duplicate message: " + result);
+ return result;
+ }
+
result = InvocationResponse.ABORT;
if (log.isDebugEnabled())
log.debug("Exit: SequenceProcessor::processReliableMessage, dropping duplicate: " + result);
@@ -208,6 +248,29 @@
// Update the RMD bean
mgr.update(bean);
+
+ // If we are doing sync 2-way over WSRM 1.0, then we may just have received one of
+ // the reply messages that we were looking for. If so we can remove the matching sender bean.
+ int mep = msgCtx.getAxisOperation().getAxisSpecifMEPConstant();
+ if(specVersion!=null && specVersion.equals(Sandesha2Constants.SPEC_VERSIONS.v1_0) &&
+ mep == WSDL20_2004Constants.MEP_CONSTANT_OUT_IN) {
+ RelatesTo relatesTo = msgCtx.getRelatesTo();
+ if(relatesTo != null) {
+ String messageId = relatesTo.getValue();
+ SenderBean matcher = new SenderBean();
+ matcher.setMessageID(messageId);
+ SenderBean sender = storageManager.getSenderBeanMgr().findUnique(matcher);
+ if(sender != null) {
+ if(log.isDebugEnabled()) log.debug("Deleting sender for sync-2-way message");
+ storageManager.removeMessageContext(sender.getMessageContextRefKey());
+ storageManager.getSenderBeanMgr().delete(messageId);
+
+ // Try and terminate the corresponding outbound sequence
+ RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sender.getSequenceID());
+ TerminateManager.checkAndTerminate(rmMsgCtx, storageManager, rmsBean);
+ }
+ }
+ }
// inorder invocation is still a global property
boolean inOrderInvocation = SandeshaUtil.getPropertyBean(
@@ -226,7 +289,7 @@
// add an ack entry here
boolean backchannelFree = (replyTo != null && !replyTo.hasAnonymousAddress()) ||
- WSDL20_2004Constants.MEP_URI_IN_ONLY.equals(mep);
+ WSDL20_2004Constants.MEP_CONSTANT_IN_ONLY == mep;
EndpointReference acksTo = new EndpointReference (bean.getAcksToEPR());
if (acksTo.hasAnonymousAddress() && backchannelFree) {
Object responseWritten = msgCtx.getOperationContext().getProperty(Constants.RESPONSE_WRITTEN);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/SandeshaPolicyBean.java Fri Feb 2 11:29:56 2007
@@ -63,6 +63,8 @@
private boolean enableMakeConnection;
private boolean enableRMAnonURI;
+
+ private boolean useMessageSerialization;
public void setInactiveTimeoutInterval(long value, String measure) {
long timeOut = -1;
@@ -270,6 +272,11 @@
// </wsrm:MakeConnection>
writer.writeEndElement();
+ // <wsrm:UseMessageSerialization />
+ writer.writeStartElement(prefix, Sandesha2Constants.Assertions.Q_ELEM_USE_SERIALIZATION.getLocalPart(), namespaceURI);
+ writer.writeCharacters(Boolean.toString(isUseMessageSerialization()));
+ writer.writeEndElement();
+
// </wsp:Policy>
writer.writeEndElement();
@@ -334,8 +341,17 @@
this.enableRMAnonURI = enableRMAnonURI;
}
+ public boolean isUseMessageSerialization() {
+ return useMessageSerialization;
+ }
+
+ public void setUseMessageSerialization(boolean useMessageSerialization) {
+ this.useMessageSerialization = useMessageSerialization;
+ }
+
public boolean equal(PolicyComponent policyComponent) {
// TODO
return false;
- }
+ }
+
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/policy/builders/RMAssertionBuilder.java Fri Feb 2 11:29:56 2007
@@ -148,6 +148,9 @@
propertyBean.setEnableRMAnonURI(Boolean.valueOf(data).booleanValue());
}
}
+ } else if (Sandesha2Constants.Assertions.ELEM_USE_SERIALIZATION.equals(name)) {
+ String value = element.getText().trim();
+ propertyBean.setUseMessageSerialization(Boolean.valueOf(value).booleanValue());
}
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java Fri Feb 2 11:29:56 2007
@@ -192,6 +192,9 @@
else if(bean.getOutOfOrderRanges() != null && !bean.getOutOfOrderRanges().equals(this.getOutOfOrderRanges()))
equal = false;
+
+ else if(bean.getToAddress() != null && !bean.getToAddress().equals(this.getToAddress()))
+ equal = false;
else if ((bean.rmdFlags & NEXT_MSG_NO_FLAG) != 0 && bean.getNextMsgNoToProcess() != this.getNextMsgNoToProcess())
equal = false;
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Fri Feb 2 11:29:56 2007
@@ -17,18 +17,27 @@
package org.apache.sandesha2.storage.inmemory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.HashMap;
-import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisModule;
+import org.apache.axis2.transport.RequestResponseTransport;
+import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
@@ -48,7 +57,6 @@
private static InMemoryStorageManager instance = null;
private final String MESSAGE_MAP_KEY = "Sandesha2MessageMap";
- private final String ENVELOPE_MAP_KEY = "Sandesha2EnvelopeMap";
private RMSBeanMgr rMSBeanMgr = null;
private RMDBeanMgr rMDBeanMgr = null;
private SenderBeanMgr senderBeanMgr = null;
@@ -56,16 +64,23 @@
private Sender sender = null;
private Invoker invoker = null;
private HashMap transactions = new HashMap();
+ private boolean useSerialization = false;
- public InMemoryStorageManager(ConfigurationContext context) {
+ public InMemoryStorageManager(ConfigurationContext context)
+ throws SandeshaException
+ {
super(context);
+ SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
+ useSerialization = policy.isUseMessageSerialization();
+
this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
this.rMDBeanMgr = new InMemoryRMDBeanMgr (this, context);
this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
this.sender = new Sender();
this.invoker = new Invoker();
+
}
public Transaction getTransaction() {
@@ -155,7 +170,9 @@
}
public static InMemoryStorageManager getInstance(
- ConfigurationContext context) {
+ ConfigurationContext context)
+ throws SandeshaException
+ {
if (instance == null)
instance = new InMemoryStorageManager(context);
@@ -170,32 +187,55 @@
return null;
}
- MessageContext messageContext = (MessageContext) storageMap.get(key);
-
- HashMap envMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
- if(envMap==null) {
- if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::retrieveMessageContext");
- return null;
- }
-
- //Get hold of the original SOAP envelope
- SOAPEnvelope envelope = (SOAPEnvelope)envMap.get(key);
-
- //Now clone the env and set it in the message context
- if (envelope!=null) {
- try {
- SOAPEnvelope clonedEnvelope = SandeshaUtil.cloneEnvelope(envelope);
- messageContext.setEnvelope(clonedEnvelope);
- } catch (AxisFault e) {
- throw new SandeshaStorageException (e);
+ MessageContext messageContext = null;
+ try {
+ if(useSerialization) {
+ SerializedStorageEntry entry = (SerializedStorageEntry) storageMap.get(key);
+
+ if(entry != null) {
+ ByteArrayInputStream stream = new ByteArrayInputStream(entry.data);
+ ObjectInputStream is = new ObjectInputStream(stream);
+ messageContext = (MessageContext) is.readObject();
+ messageContext.activate(entry.context);
+
+ OperationContext opCtx = messageContext.getOperationContext();
+ if(opCtx != null) {
+ MessageContext inMsgCtx = opCtx.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ if(inMsgCtx != null) {
+ inMsgCtx.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, entry.inTransportControl);
+ inMsgCtx.setProperty(MessageContext.TRANSPORT_OUT, entry.inTransportOut);
+ inMsgCtx.setProperty(Constants.OUT_TRANSPORT_INFO, entry.inTransportOutInfo);
+ }
+ }
+
+ messageContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, entry.transportControl);
+ messageContext.setProperty(MessageContext.TRANSPORT_OUT, entry.transportOut);
+ messageContext.setProperty(Constants.OUT_TRANSPORT_INFO, entry.transportOutInfo);
+ }
+
+ } else {
+ StorageEntry entry = (StorageEntry) storageMap.get(key);
+
+ if(entry != null) {
+ messageContext = entry.msgContext;
+ SOAPEnvelope clonedEnvelope = SandeshaUtil.cloneEnvelope(entry.envelope);
+ messageContext.setEnvelope(clonedEnvelope);
+ }
}
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.failedToLoadMessage, e.toString());
+ if(log.isDebugEnabled()) log.debug(message);
+ throw new SandeshaStorageException(message, e);
}
-
+
if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::retrieveMessageContext, " + messageContext);
return messageContext;
}
- public void storeMessageContext(String key,MessageContext msgContext) {
+ public void storeMessageContext(String key,MessageContext msgContext)
+ throws SandeshaStorageException
+ {
if(log.isDebugEnabled()) log.debug("Enter: InMemoryStorageManager::storeMessageContext, key: " + key);
HashMap storageMap = (HashMap) getContext().getProperty(MESSAGE_MAP_KEY);
@@ -207,21 +247,46 @@
if (key==null)
key = SandeshaUtil.getUUID();
- storageMap.put(key,msgContext);
-
- //Now get hold of the SOAP envelope and store it in the env map
- HashMap envMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-
- if(envMap==null) {
- envMap = new HashMap ();
- getContext().setProperty(ENVELOPE_MAP_KEY, envMap);
- }
-
- SOAPEnvelope envelope = msgContext.getEnvelope();
+ try {
+ if(useSerialization) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ ObjectOutputStream s = new ObjectOutputStream(stream);
+ s.writeObject(msgContext);
+ s.close();
+
+ SerializedStorageEntry entry = new SerializedStorageEntry();
+ entry.data = stream.toByteArray();
+ entry.context = msgContext.getConfigurationContext();
+
+ OperationContext opCtx = msgContext.getOperationContext();
+ if(opCtx != null) {
+ MessageContext inMsgCtx = opCtx.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+ if(inMsgCtx != null) {
+ entry.inTransportControl = inMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+ entry.inTransportOut = inMsgCtx.getProperty(MessageContext.TRANSPORT_OUT);
+ entry.inTransportOutInfo = inMsgCtx.getProperty(Constants.OUT_TRANSPORT_INFO);
+ }
+ }
+ entry.transportControl = msgContext.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+ entry.transportOut = msgContext.getProperty(MessageContext.TRANSPORT_OUT);
+ entry.transportOutInfo = msgContext.getProperty(Constants.OUT_TRANSPORT_INFO);
+
+ storageMap.put(key, entry);
- //We are storing the original envelope here.
- //Storing a cloned version will caus HeaderBlocks to loose their setProcessed information.
- envMap.put(key, envelope);
+ } else {
+ //We are storing the original envelope here.
+ //Storing a cloned version will caus HeaderBlocks to loose their setProcessed information.
+ StorageEntry entry = new StorageEntry();
+ entry.msgContext = msgContext;
+ entry.envelope = msgContext.getEnvelope();
+ storageMap.put(key,entry);
+ }
+ } catch(Exception e) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.failedToStoreMessage, e.toString());
+ if(log.isDebugEnabled()) log.debug(message);
+ throw new SandeshaStorageException(message, e);
+ }
if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::storeMessageContext, key: " + key);
}
@@ -236,17 +301,11 @@
SandeshaMessageKeys.storageMapNotPresent));
}
- Object oldEntry = storageMap.get(key);
+ Object oldEntry = storageMap.remove(key);
if (oldEntry==null)
throw new SandeshaStorageException (SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.entryNotPresentForUpdating));
- HashMap envMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-
- storageMap.remove(key);
- if (envMap!=null)
- envMap.remove(key);
-
storeMessageContext(key,msgContext);
if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::updateMessageContext, key: " + key);
@@ -256,15 +315,10 @@
if(log.isDebugEnabled()) log.debug("Enter: InMemoryStorageManager::removeMessageContext, key: " + key);
HashMap storageMap = (HashMap) getContext().getProperty(MESSAGE_MAP_KEY);
- HashMap envelopeMap = (HashMap) getContext().getProperty(ENVELOPE_MAP_KEY);
-
if (storageMap!=null)
storageMap.remove(key);
- if (envelopeMap!=null)
- envelopeMap.remove(key);
-
if(log.isDebugEnabled()) log.debug("Exit: InMemoryStorageManager::removeMessageContext, key: " + key);
}
@@ -272,6 +326,20 @@
}
+ private class SerializedStorageEntry {
+ byte[] data;
+ ConfigurationContext context;
+ Object transportControl;
+ Object transportOut;
+ Object transportOutInfo;
+ Object inTransportControl;
+ Object inTransportOut;
+ Object inTransportOutInfo;
+ }
+ private class StorageEntry {
+ MessageContext msgContext;
+ SOAPEnvelope envelope;
+ }
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Fri Feb 2 11:29:56 2007
@@ -98,7 +98,7 @@
}
try {
- if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + other);
+ if(log.isDebugEnabled()) log.debug("This " + this + " waiting for " + waitingForTran);
bean.wait();
} catch(InterruptedException e) {
// Do nothing
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Fri Feb 2 11:29:56 2007
@@ -284,29 +284,14 @@
- public static boolean verifySequenceCompletion(Iterator ackRangesIterator, long lastMessageNo) {
+ public static boolean verifySequenceCompletion(RangeString ackRanges, long lastMessageNo) {
if (log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::verifySequenceCompletion");
- HashMap startMap = new HashMap();
-
- while (ackRangesIterator.hasNext()) {
- AcknowledgementRange temp = (AcknowledgementRange) ackRangesIterator.next();
- startMap.put(new Long(temp.getLowerValue()), temp);
- }
-
- long start = 1;
boolean result = false;
- while (!result) {
- AcknowledgementRange temp = (AcknowledgementRange) startMap.get(new Long(start));
- if (temp == null) {
- break;
- }
-
- if (temp.getUpperValue() >= lastMessageNo)
- result = true;
-
- start = temp.getUpperValue() + 1;
+ Range complete = new Range(1, lastMessageNo);
+ if(ackRanges.isRangeCompleted(complete)) {
+ result = true;
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/PropertyManager.java Fri Feb 2 11:29:56 2007
@@ -66,7 +66,7 @@
propertyBean.setEnableMakeConnection(Sandesha2Constants.Properties.DefaultValues.EnableMakeConnection);
propertyBean.setEnableRMAnonURI(Sandesha2Constants.Properties.DefaultValues.EnableRMAnonURI);
-
+ propertyBean.setUseMessageSerialization(Sandesha2Constants.Properties.DefaultValues.UseMessageSerialization);
return propertyBean;
}
@@ -115,6 +115,9 @@
String enableMakeConnection = properties.getProperty(Sandesha2Constants.Properties.EnableMakeConnection);
loadEnableMakeConnection(enableMakeConnection, propertyBean);
+ String useSerlialization = properties.getProperty(Sandesha2Constants.Properties.UseMessageSerialization);
+ loadUseSerialization(useSerlialization, propertyBean);
+
String messageTypesToDrop = properties.getProperty(Sandesha2Constants.Properties.MessageTypesToDrop);
loadMessageTypesToDrop(messageTypesToDrop, propertyBean);
@@ -173,6 +176,10 @@
String enableMakeConnection = (String) enableMakeConnectionParam.getValue();
loadEnableMakeConnection(enableMakeConnection, propertyBean);
+ Parameter useSerializationParam = desc.getParameter(Sandesha2Constants.Properties.UseMessageSerialization);
+ String useSerialization = (String) useSerializationParam.getValue();
+ loadUseSerialization(useSerialization, propertyBean);
+
Parameter messageTypesToDropParam = desc.getParameter(Sandesha2Constants.Properties.MessageTypesToDrop);
String messageTypesToDrop = (String) messageTypesToDropParam.getValue();
loadMessageTypesToDrop(messageTypesToDrop, propertyBean);
@@ -546,6 +553,19 @@
propertyBean.setEnableMakeConnection(true);
} else if (enableMakeConnection.equalsIgnoreCase("false")) {
propertyBean.setEnableMakeConnection(false);
+ }
+ }
+ }
+
+ private static void loadUseSerialization(String useSerialization, SandeshaPolicyBean propertyBean)
+ throws SandeshaException {
+
+ if (useSerialization != null) {
+ useSerialization = useSerialization.trim();
+ if (useSerialization.equalsIgnoreCase("true")) {
+ propertyBean.setUseMessageSerialization(true);
+ } else if (useSerialization.equalsIgnoreCase("false")) {
+ propertyBean.setUseMessageSerialization(false);
}
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Fri Feb 2 11:29:56 2007
@@ -537,9 +537,6 @@
}
}
-
- newMessageContext.setExecutionChain(referenceMessage.getExecutionChain());
-
return newMessageContext;
} catch (AxisFault e) {
@@ -967,44 +964,58 @@
MessageContext msgContext = rmMsgContext.getMessageContext();
ConfigurationContext configurationContext = msgContext.getConfigurationContext();
-
- // message will be stored in the Sandesha2TransportSender
- msgContext.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
-
- TransportOutDescription transportOut = msgContext.getTransportOut();
-
- msgContext.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, transportOut);
- msgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
- Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
- msgContext.setTransportOut(sandesha2TransportOutDesc);
+ SandeshaPolicyBean policy = getPropertyBean(msgContext.getAxisOperation());
+ if(policy.isUseMessageSerialization()) {
+ msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_TRUE);
- // sending the message once through Sandesha2TransportSender.
- AxisEngine engine = new AxisEngine(configurationContext);
-
- if (msgContext.isPaused())
- engine.resumeSend(msgContext);
- else {
- //this invocation has to be a blocking one.
-
- Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
- if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
- msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
-
- engine.send(msgContext);
+ StorageManager store = getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
+ store.storeMessageContext(storageKey, msgContext);
- msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+ } else {
+ // message will be stored in the Sandesha2TransportSender
+ msgContext.setProperty(Sandesha2Constants.MESSAGE_STORE_KEY, storageKey);
+
+ TransportOutDescription transportOut = msgContext.getTransportOut();
+
+ msgContext.setProperty(Sandesha2Constants.ORIGINAL_TRANSPORT_OUT_DESC, transportOut);
+ msgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
+
+ Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
+ msgContext.setTransportOut(sandesha2TransportOutDesc);
+
+ // sending the message once through Sandesha2TransportSender.
+ AxisEngine engine = new AxisEngine(configurationContext);
+
+ if (msgContext.isPaused())
+ engine.resumeSend(msgContext);
+ else {
+ //this invocation has to be a blocking one.
+
+ Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+ if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+
+ engine.send(msgContext);
+
+ msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+ }
}
-
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::executeAndStore");
}
- public static void modifyExecutionChainForStoring (MessageContext message) {
+ public static void modifyExecutionChainForStoring (MessageContext message)
+ throws SandeshaException
+ {
Object property = message.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
if (property!=null)
return; //Phases are already set. Dont hv to redo.
+
+ SandeshaPolicyBean policy = getPropertyBean(message.getAxisOperation());
+ if(policy.isUseMessageSerialization())
+ return; // No need to mess with the transport when we use message serialization
TransportOutDescription transportOutDescription = message.getTransportOut();
if (!(transportOutDescription instanceof Sandesha2TransportOutDesc))
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/TerminateManager.java Fri Feb 2 11:29:56 2007
@@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
@@ -33,6 +34,7 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
@@ -57,6 +59,63 @@
public static HashMap receivingSideCleanMap = new HashMap();
+ public static void checkAndTerminate(RMMsgContext relatedMessage, StorageManager storageManager, RMSBean rmsBean)
+ throws SandeshaStorageException, AxisFault {
+ if(log.isDebugEnabled()) log.debug("Entry: TerminateManager::checkAndTerminate");
+
+ long lastOutMessage = rmsBean.getLastOutMessage ();
+
+ if (lastOutMessage > 0 && !rmsBean.isTerminateAdded()) {
+
+ boolean complete = AcknowledgementManager.verifySequenceCompletion(rmsBean.getClientCompletedMessages(), lastOutMessage);
+
+ //If this is RM 1.1 and RMAnonURI scenario, dont do the termination unless the response side createSequence has been
+ //received (RMDBean has been created) through polling, in this case termination will happen in the create sequence response processor.
+ String rmVersion = rmsBean.getRMVersion();
+ String replyToAddress = rmsBean.getReplyToEPR();
+
+ if (complete &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(rmVersion) && SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+ RMDBean findBean = new RMDBean ();
+ findBean.setPollingMode(true);
+ findBean.setToAddress(replyToAddress);
+
+ RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
+ List beans = rmdBeanMgr.find(findBean);
+ System.out.println("Checking replyTo" + replyToAddress);
+ if(beans.isEmpty()) {
+ System.out.println("No beans.");
+ rmsBean.setTerminationPauserForCS(true);
+ storageManager.getRMSBeanMgr().update(rmsBean);
+ complete = false;
+ } else {
+ System.out.println("Matcher:\n" + findBean + "\nFound beans:\n" + beans);
+ }
+ }
+
+ // If we are doing sync 2-way over WSRM 1.0 then we may need to keep sending messages,
+ // so check to see if all the senders have been removed
+ EndpointReference replyTo = new EndpointReference (replyToAddress);
+ if (complete &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(rmVersion) && replyTo.hasAnonymousAddress()) {
+ SenderBean matcher = new SenderBean();
+ matcher.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
+ matcher.setSequenceID(rmsBean.getSequenceID());
+
+ List matches = storageManager.getSenderBeanMgr().find(matcher);
+ if(!matches.isEmpty()) complete = false;
+ }
+
+ if (complete) {
+ addTerminateSequenceMessage(relatedMessage, rmsBean.getInternalSequenceID(), rmsBean.getSequenceID(), storageManager);
+ }
+
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: TerminateManager::checkAndTerminate");
+ }
+
+
/**
* Called by the receiving side to remove data related to a sequence. e.g.
* After sending the TerminateSequence message. Calling this methods will
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Fri Feb 2 11:29:56 2007
@@ -27,6 +27,7 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.policy.SandeshaPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -127,12 +128,12 @@
// or the message can't go anywhere. If there is nothing here then we leave the
// message in the sender queue, and a MakeConnection (or a retransmitted request)
// will hopefully pick it up soon.
+ RequestResponseTransport t = null;
Boolean makeConnection = (Boolean) msgCtx.getProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE);
EndpointReference toEPR = msgCtx.getTo();
if(toEPR.hasAnonymousAddress() &&
(makeConnection == null || !makeConnection.booleanValue())) {
- RequestResponseTransport t = null;
MessageContext inMsg = null;
OperationContext op = msgCtx.getOperationContext();
if (op != null)
@@ -185,37 +186,49 @@
}
try {
-
- // had to fully build the SOAP envelope to support
- // retransmissions.
- // Otherwise a 'parserAlreadyAccessed' exception could
- // get thrown in retransmissions.
- // But this has a performance reduction.
- msgCtx.getEnvelope().build();
-
- ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
- if (retransmittablePhases!=null) {
- msgCtx.setExecutionChain(retransmittablePhases);
+ AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
+ InvocationResponse response = InvocationResponse.CONTINUE;
+
+ SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation());
+ if(policy.isUseMessageSerialization()) {
+ if(msgCtx.isPaused()) {
+ if (log.isDebugEnabled())
+ log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
+ msgCtx.setPaused(false);
+ msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+ response = engine.resumeSend(msgCtx);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Sending a message : " + msgCtx.getEnvelope().getHeader());
+ msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
+ engine.send(msgCtx); // TODO check if this should return an invocation response
+ }
} else {
- ArrayList emptyExecutionChain = new ArrayList ();
- msgCtx.setExecutionChain(emptyExecutionChain);
- }
+ // had to fully build the SOAP envelope to support
+ // retransmissions.
+ // Otherwise a 'parserAlreadyAccessed' exception could
+ // get thrown in retransmissions.
+ // But this has a performance reduction.
+ msgCtx.getEnvelope().build();
+
+ ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
+ if (retransmittablePhases!=null) {
+ msgCtx.setExecutionChain(retransmittablePhases);
+ } else {
+ ArrayList emptyExecutionChain = new ArrayList ();
+ msgCtx.setExecutionChain(emptyExecutionChain);
+ }
+
+ msgCtx.setCurrentHandlerIndex(0);
+ msgCtx.setCurrentPhaseIndex(0);
+ msgCtx.setPaused(false);
- msgCtx.setCurrentHandlerIndex(0);
- msgCtx.setCurrentPhaseIndex(0);
- msgCtx.setPaused(false);
-
- AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
- if (log.isDebugEnabled())
- log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
- InvocationResponse response = engine.resumeSend(msgCtx);
+ if (log.isDebugEnabled())
+ log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
+ response = engine.resumeSend(msgCtx);
+ }
if(log.isDebugEnabled()) log.debug("Engine resume returned " + response);
if(response != InvocationResponse.SUSPEND) {
- RequestResponseTransport t = null;
- MessageContext inMsg = null;
- OperationContext op = msgCtx.getOperationContext();
- if(op != null) inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- if(inMsg != null) t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
if(t != null) {
if(log.isDebugEnabled()) log.debug("Signalling transport in " + t);
if(t != null) t.signalResponseReady();
Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/RMScenariosTest.java Fri Feb 2 11:29:56 2007
@@ -42,8 +42,7 @@
private static boolean serverStarted = false;
private static ConfigurationContext configContext = null;
- private int serverPort = DEFAULT_SERVER_TEST_PORT;
- private String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+ protected String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
protected String repoPath = "target" + File.separator + "repos" + File.separator + "server";
protected String axis2_xml = "target" + File.separator + "repos" + File.separator + "server" + File.separator + "server_axis2.xml";
Added: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java?view=auto&rev=502698
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java (added)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java Fri Feb 2 11:29:56 2007
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2007 The Apache Software Foundation.
+ * Copyright 2007 International Business Machines Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sandesha2.scenarios;
+
+import java.io.File;
+
+public class SerializableScenariosTest extends RMScenariosTest {
+
+ public SerializableScenariosTest() {
+ super("SerializableScenariosTest");
+ this.repoPath = "target" + File.separator + "repos" + File.separator + "serialize-server";
+ this.axis2_xml = repoPath + File.separator + "server_axis2.xml";
+
+ this.repoPathClient = "target" + File.separator + "repos" + File.separator + "serialize-client";
+ this.axis2_xmlClient = repoPathClient + File.separator + "client_axis2.xml";
+ }
+
+}
Propchange: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/SerializableScenariosTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java?view=diff&rev=502698&r1=502697&r2=502698
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java (original)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/security/UnitTestSecurityManager.java Fri Feb 2 11:29:56 2007
@@ -115,7 +115,11 @@
}
}
}
- if(!foundToken) throw new SandeshaException("Message was not secured with the correct token(s)");
+ if(!foundToken) {
+ SandeshaException e = new SandeshaException("Message was not secured with the correct token(s)");
+ e.printStackTrace(System.err);
+ throw e;
+ }
log.debug("Exit: UnitTestSecurityManager::checkProofOfPossession");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org