You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2007/02/15 15:20:24 UTC
svn commit: r507936 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2: ./ handlers/
msgprocessors/ polling/ storage/beans/ util/ workers/
Author: mlovett
Date: Thu Feb 15 06:20:22 2007
New Revision: 507936
URL: http://svn.apache.org/viewvc?view=rev&rev=507936
Log:
Improve performance of MakeConnection by polling less often, and paying attention to MessagePending headers
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java (with props)
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Thu Feb 15 06:20:22 2007
@@ -502,6 +502,7 @@
static final String INBOUND_SEQUENCE_ID = "Sandesha2InboundSequenceId";
static final String INBOUND_MESSAGE_NUMBER = "Sandesha2InboundMessageNumber";
static final String INBOUND_LAST_MESSAGE = "Sandesha2InboundLastMessage";
+ static final String MAKECONNECTION_ENTRY = "Sandesha2MakeConnectionEntry";
}
public interface Assertions {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaInHandler.java Thu Feb 15 06:20:22 2007
@@ -34,6 +34,7 @@
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.msgprocessors.AckRequestedProcessor;
import org.apache.sandesha2.msgprocessors.AcknowledgementProcessor;
+import org.apache.sandesha2.msgprocessors.MessagePendingProcessor;
import org.apache.sandesha2.msgprocessors.SequenceProcessor;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
import org.apache.sandesha2.storage.StorageManager;
@@ -115,6 +116,10 @@
if(reqProcessor.processAckRequestedHeaders(rmMsgCtx)){
returnValue = InvocationResponse.SUSPEND;
}
+
+ // Process MessagePending headers
+ MessagePendingProcessor pendingProcessor = new MessagePendingProcessor();
+ pendingProcessor.processMessagePendingHeaders(rmMsgCtx);
// Process the Sequence header, if there is one
SequenceProcessor seqProcessor = new SequenceProcessor();
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Thu Feb 15 06:20:22 2007
@@ -20,13 +20,11 @@
import org.apache.axiom.soap.SOAPBody;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.addressing.RelatesTo;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.logging.Log;
@@ -38,7 +36,6 @@
import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.policy.SandeshaPolicyBean;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
@@ -90,15 +87,13 @@
// Re-write the WS-A anonymous URI, if we support the RM anonymous URI. We only
// need to rewrite the replyTo EPR if we have an out-in MEP.
- SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
- if(policy.isEnableRMAnonURI()) {
- AxisOperation op = msgContext.getAxisOperation();
- if(op != null) {
- int mep = op.getAxisSpecifMEPConstant();
- if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
- EndpointReference replyTo = rewriteEPR(msgContext.getReplyTo(), msgContext);
- msgContext.setReplyTo(replyTo);
- }
+ AxisOperation op = msgContext.getAxisOperation();
+ int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+ if(op != null) {
+ mep = op.getAxisSpecifMEPConstant();
+ if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+ EndpointReference replyTo = SandeshaUtil.rewriteEPR(msgContext.getReplyTo(), msgContext);
+ msgContext.setReplyTo(replyTo);
}
}
@@ -279,9 +274,14 @@
// set this as the response highest message.
rmsBean.setHighestOutMessageNumber(messageNumber);
- // saving the used message number
- if (!dummyMessage)
+ // saving the used message number, and the expected reply count
+ if (!dummyMessage) {
rmsBean.setNextMessageNumber(messageNumber);
+ if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
+ long expectedReplies = rmsBean.getExpectedReplies();
+ rmsBean.setExpectedReplies(expectedReplies + 1);
+ }
+ }
RelatesTo relatesTo = msgContext.getRelatesTo();
if(relatesTo != null) {
@@ -491,45 +491,6 @@
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
- }
-
- private EndpointReference rewriteEPR(EndpointReference epr, MessageContext mc)
- throws SandeshaException
- {
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::rewriteEPR " + epr);
-
- // Handle EPRs that have not yet been set. These are effectively WS-A anon, and therefore
- // we can rewrite them.
- if(epr == null) epr = new EndpointReference(null);
-
- String address = epr.getAddress();
- if(address == null ||
- AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
- AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
- // We use the service context to co-ordinate the RM anon uuid, so that several
- // invocations of the same target will yield stable replyTo addresses.
- String uuid = null;
- ServiceContext sc = mc.getServiceContext();
- if(sc == null) {
- String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
- throw new SandeshaException(msg);
- }
- synchronized (sc) {
- uuid = (String) sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
- if(uuid == null) {
- uuid = SandeshaUtil.getUUID();
- sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
- }
- }
-
- if(log.isDebugEnabled()) log.debug("Rewriting EPR with UUID " + uuid);
- epr.setAddress(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX + uuid);
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: SandeshaOutHandler::rewriteEPR " + epr);
- return epr;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Thu Feb 15 06:20:22 2007
@@ -164,6 +164,7 @@
rMDBean.setAcksToEPR(acksToEPR.getAddress());
rMDBean.setSequenceID(rmsBean.getOfferedSequence());
rMDBean.setNextMsgNoToProcess(1);
+ rMDBean.setOutboundSequence(rmsBean.getSequenceID());
//Storing the referenceMessage of the sending side sequence as the reference message
//of the receiving side as well.
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Thu Feb 15 06:20:22 2007
@@ -10,7 +10,6 @@
import org.apache.axis2.context.ContextFactory;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
-import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.commons.logging.Log;
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Thu Feb 15 06:20:22 2007
@@ -2,43 +2,35 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.polling.PollingManager;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SequenceEntry;
import org.apache.sandesha2.wsrm.MessagePending;
-import org.apache.sandesha2.wsrm.Sequence;
public class MessagePendingProcessor {
private static final Log log = LogFactory.getLog(MessagePendingProcessor.class);
- public boolean processMessagePendingHeaders (MessageContext message) throws AxisFault {
+ public void processMessagePendingHeaders(RMMsgContext message) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: MessagePendingProcessor::processMessagePendingHeaders");
- boolean messagePaused = false;
-
- RMMsgContext rmMsgContext = MsgInitializer.initializeMessage(message);
- Sequence sequence = (Sequence) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
- MessagePending messagePending = (MessagePending) rmMsgContext.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
-
- if (sequence!=null) {
- String sequenceId = sequence.getIdentifier().getIdentifier();
-
- if (messagePending!=null) {
- boolean pending = messagePending.isPending();
- if (pending) {
- ConfigurationContext context = rmMsgContext.getConfigurationContext();
+ MessagePending messagePending = (MessagePending) message.getMessagePart(Sandesha2Constants.MessageParts.MESSAGE_PENDING);
+ if (messagePending!=null) {
+ boolean pending = messagePending.isPending();
+ if (pending) {
+ SequenceEntry entry = (SequenceEntry) message.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+ if(entry != null) {
+ ConfigurationContext context = message.getConfigurationContext();
StorageManager storage = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
PollingManager pollingManager = storage.getPollingManager();
- if(pollingManager != null) pollingManager.schedulePollingRequest(sequenceId, false);
+ if(pollingManager != null) pollingManager.schedulePollingRequest(entry.getSequenceId(), entry.isRmSource());
}
}
}
@@ -47,8 +39,6 @@
if (log.isDebugEnabled())
log.debug("Exit: MessagePendingProcessor::processMessagePendingHeaders");
-
- return messagePaused;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Thu Feb 15 06:20:22 2007
@@ -43,6 +43,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.InvokerBean;
import org.apache.sandesha2.storage.beans.RMDBean;
@@ -263,6 +264,18 @@
if (!msgNoPresentInList)
{
serverCompletedMessageRanges.addRange(new Range(msgNo));
+ }
+
+ // If the message is a reply to an outbound message then we can update the RMSBean that
+ // matches.
+ String outboundSequence = bean.getOutboundSequence();
+ if(outboundSequence != null) {
+ RMSBean outBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outboundSequence);
+ if(outBean != null && outBean.getExpectedReplies() > 0 ) {
+ outBean.setExpectedReplies(outBean.getExpectedReplies() - 1);
+ RMSBeanMgr outMgr = storageManager.getRMSBeanMgr();
+ outMgr.update(outBean);
+ }
}
// Update the RMD bean
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Thu Feb 15 06:20:22 2007
@@ -23,6 +23,7 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
@@ -37,10 +38,12 @@
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMSequenceBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SequenceEntry;
/**
* This class is responsible for sending MakeConnection requests. This is a seperate thread that
@@ -68,11 +71,14 @@
Transaction t = null;
try {
// If we have request scheduled, handle them first, and then pick
- // pick a sequence using a round-robin approach.
+ // pick a sequence using a round-robin approach. Scheduled polls
+ // bypass the normal polling checks, to make sure that they happen
+ boolean forcePoll = false;
SequenceEntry entry = null;
synchronized (this) {
if(!scheduledPollingRequests.isEmpty()) {
entry = (SequenceEntry) scheduledPollingRequests.removeFirst();
+ forcePoll = true;
}
}
if(entry == null) {
@@ -93,9 +99,9 @@
t = storageManager.getTransaction();
if(entry.isRmSource()) {
- pollRMSSide(entry);
+ pollRMSSide(entry, forcePoll);
} else {
- pollRMDSide(entry);
+ pollRMDSide(entry, forcePoll);
}
if(t != null) t.commit();
t = null;
@@ -115,8 +121,8 @@
return false;
}
- private void pollRMSSide(SequenceEntry entry) throws AxisFault {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMSSide");
+ private void pollRMSSide(SequenceEntry entry, boolean force) throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMSSide, force: " + force);
RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
RMSBean findRMS = new RMSBean();
@@ -129,14 +135,19 @@
// This sequence must have been terminated, or deleted
stopThreadForSequence(entry.getSequenceId(), true);
} else {
- pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll);
+ // The sequence is there, but we still only poll if we are expecting reply messages,
+ // or if we don't have clean ack state.
+ boolean cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
+ long repliesExpected = beanToPoll.getExpectedReplies();
+ if(force || !cleanAcks || repliesExpected > 0)
+ pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
}
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
}
- private void pollRMDSide(SequenceEntry entry) throws AxisFault {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMDSide");
+ private void pollRMDSide(SequenceEntry entry, boolean force) throws AxisFault {
+ if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMDSide, force: " + force);
RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
RMDBean findBean = new RMDBean();
findBean.setPollingMode(true);
@@ -148,7 +159,22 @@
// This sequence must have been terminated, or deleted
stopThreadForSequence(entry.getSequenceId(), false);
} else {
- pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
+ // The sequence is still there, but if we have a running related sequence
+ // that is not expecting replies then there is no need to poll.
+ boolean doPoll = true;
+ String outboundSequence = nextMsgBean.getOutboundSequence();
+ if(outboundSequence != null) {
+ RMSBean findRMS = new RMSBean();
+ findRMS.setSequenceID(outboundSequence);
+ findRMS.setTerminated(false);
+ RMSBeanMgr mgr = storageManager.getRMSBeanMgr();
+ RMSBean outbound = mgr.findUnique(findRMS);
+ if(outbound != null && outbound.getExpectedReplies() == 0) {
+ doPoll = false;
+ }
+ }
+ if(force || doPoll)
+ pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
}
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide");
@@ -157,25 +183,34 @@
private void pollForSequence(String sequenceId,
String sequencePropertyKey,
String referenceMsgKey,
- RMSequenceBean rmBean)
+ RMSequenceBean rmBean,
+ SequenceEntry entry)
throws SandeshaException, SandeshaStorageException, AxisFault
{
if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey + ", " + referenceMsgKey + ", " + rmBean);
//create a MakeConnection message
String replyTo = rmBean.getReplyToEPR();
- String WSRMAnonReplyToURI = null;
+ String wireSeqId = null;
+ String wireAddress = null;
if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
// If we are polling on a RM anon URI then we don't want to include the sequence id
// in the MakeConnection message.
- sequenceId = null;
- WSRMAnonReplyToURI = replyTo;
+ wireAddress = replyTo;
+ } else {
+ wireSeqId = sequenceId;
}
MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
- rmBean, sequenceId, WSRMAnonReplyToURI, storageManager);
+ rmBean, wireSeqId, wireAddress, storageManager);
+
+ // Store properties so that we know which sequence we are polling for. This can be used
+ // to match reply sequences up to requests, as well as to help process messagePending
+ // headers.
+ OperationContext ctx = makeConnectionRMMessage.getMessageContext().getOperationContext();
+ ctx.setProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY, entry);
makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
//storing the MakeConnection message.
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMDBean.java Thu Feb 15 06:20:22 2007
@@ -58,6 +58,12 @@
* To Address of the messages that will be received for this sequence.
*/
private String toAddress;
+
+ /**
+ * Client side, we keep track of inbound and outbound sequence pairs. Each
+ * inbound sequence has the identifier of the associated outbound sequence.
+ */
+ private String outboundSequence;
/**
* Comment for <code>nextMsgNoToProcess</code>
@@ -157,6 +163,15 @@
this.toAddress = toAddress;
}
+ public String getOutboundSequence() {
+ return outboundSequence;
+ }
+
+ public void setOutboundSequence(String outboundSequence) {
+ this.outboundSequence = outboundSequence;
+ }
+
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -195,6 +210,9 @@
equal = false;
else if(bean.getToAddress() != null && !bean.getToAddress().equals(this.getToAddress()))
+ equal = false;
+
+ else if(bean.getOutboundSequence() != null && !bean.getOutboundSequence().equals(this.getOutboundSequence()))
equal = false;
else if ((bean.rmdFlags & NEXT_MSG_NO_FLAG) != 0 && bean.getNextMsgNoToProcess() != this.getNextMsgNoToProcess())
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java Thu Feb 15 06:20:22 2007
@@ -122,6 +122,11 @@
private long numberOfMessagesAcked = 0;
/**
+ * The number of reply messages that we expect
+ */
+ private long expectedReplies = 0;
+
+ /**
* Flags that are used to check if the primitive types on this bean
* have been set. If a primitive type has not been set then it will
* be ignored within the match method.
@@ -136,6 +141,7 @@
private static final int SEQ_CLOSED_CLIENT_FLAG = 0x01000000;
private static final int ACKED_MESSAGES_FLAG = 0x10000000;
private static final int TERM_PAUSER_FOR_CS = 0x00000002;
+ private static final int EXPECTED_REPLIES = 0x00000020;
/**
* In WSRM Anon URI scenario, we may not want to terminate a perticular sequence until the CreateSequence has been received
@@ -310,6 +316,15 @@
}
+ public long getExpectedReplies() {
+ return expectedReplies;
+ }
+
+ public void setExpectedReplies(long expectedReplies) {
+ this.expectedReplies = expectedReplies;
+ this.rmsFlags |= EXPECTED_REPLIES;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -327,6 +342,7 @@
result.append("\nTimedOut : "); result.append(timedOut);
result.append("\nClosedClient : "); result.append(sequenceClosedClient);
result.append("\nNumAckedMsgs : "); result.append(numberOfMessagesAcked);
+ result.append("\nExpectedReplies : "); result.append(expectedReplies);
result.append("\nTransportTo : "); result.append(transportTo);
result.append("\nOfferedEndPoint : "); result.append(offeredEndPoint);
result.append("\nOfferedSequence : "); result.append(offeredSequence);
@@ -403,7 +419,11 @@
else if((bean.rmsFlags & TERM_PAUSER_FOR_CS) != 0 && bean.isTerminationPauserForCS() != this.isTerminationPauserForCS())
match = false;
-
+
+ else if((bean.rmsFlags & EXPECTED_REPLIES) != 0 && bean.getExpectedReplies() != this.getExpectedReplies())
+ match = false;
+
return match;
}
+
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Thu Feb 15 06:20:22 2007
@@ -941,4 +941,53 @@
String stackTrace = baos.toString();
return stackTrace;
}
+
+ public static EndpointReference rewriteEPR(EndpointReference epr, MessageContext mc)
+ throws SandeshaException
+ {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaUtil::rewriteEPR " + epr);
+
+ ConfigurationContext configContext = mc.getConfigurationContext();
+ SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
+ if(!policy.isEnableRMAnonURI()) {
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaUtil::rewriteEPR, anon uri is disabled");
+ return epr;
+ }
+
+ // Handle EPRs that have not yet been set. These are effectively WS-A anon, and therefore
+ // we can rewrite them.
+ if(epr == null) epr = new EndpointReference(null);
+
+ String address = epr.getAddress();
+ if(address == null ||
+ AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
+ AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
+ // We use the service context to co-ordinate the RM anon uuid, so that several
+ // invocations of the same target will yield stable replyTo addresses.
+ String uuid = null;
+ ServiceContext sc = mc.getServiceContext();
+ if(sc == null) {
+ String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
+ throw new SandeshaException(msg);
+ }
+ synchronized (sc) {
+ uuid = (String) sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
+ if(uuid == null) {
+ uuid = SandeshaUtil.getUUID();
+ sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Rewriting EPR with UUID " + uuid);
+ epr.setAddress(Sandesha2Constants.SPEC_2006_08.ANONYMOUS_URI_PREFIX + uuid);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaUtil::rewriteEPR " + epr);
+ return epr;
+ }
+
+
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Thu Feb 15 06:20:22 2007
@@ -31,6 +31,7 @@
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
+import org.apache.sandesha2.workers.SequenceEntry;
import org.apache.sandesha2.wsrm.CreateSequence;
/**
@@ -82,6 +83,13 @@
}
MessageContext createSeqContext = createSequenceMsg.getMessageContext();
+
+ // If this create is the result of a MakeConnection, then we must have a related
+ // outbound sequence.
+ SequenceEntry entry = (SequenceEntry) createSeqContext.getProperty(Sandesha2Constants.MessageContextProperties.MAKECONNECTION_ENTRY);
+ if(entry != null && entry.isRmSource()) {
+ rmdBean.setOutboundSequence(entry.getSequenceId());
+ }
rmdBean.setServerCompletedMessages(new RangeString());
@@ -260,7 +268,11 @@
}
}
}
- // Store both the acksTo and replyTo
+ // In case either of the replyTo or AcksTo is anonymous, rewrite them using the AnonURI template
+ replyToEPR = SandeshaUtil.rewriteEPR(replyToEPR, firstAplicationMsgCtx);
+ acksToEPR = SandeshaUtil.rewriteEPR(acksToEPR, firstAplicationMsgCtx);
+
+ // Store both the acksTo and replyTo
if(replyToEPR != null) rmsBean.setReplyToEPR(replyToEPR.getAddress());
if(acksToEPR != null) rmsBean.setAcksToEPR(acksToEPR.getAddress());
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=507936&r1=507935&r2=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Thu Feb 15 06:20:22 2007
@@ -299,42 +299,4 @@
}
}
}
-
- protected class SequenceEntry {
- private String sequenceId;
- private boolean rmSource;
-
- public SequenceEntry(String sequenceId, boolean rmSource) {
- this.sequenceId = sequenceId;
- this.rmSource = rmSource;
- }
- public boolean isRmSource() {
- return rmSource;
- }
- public String getSequenceId() {
- return sequenceId;
- }
-
-
- public boolean equals(Object o) {
- if(o == null) return false;
- if(o == this) return true;
- if(o.getClass() != getClass()) return false;
-
- SequenceEntry other = (SequenceEntry) o;
- if(sequenceId != null) {
- if(!sequenceId.equals(other.sequenceId)) return false;
- } else {
- if(other.sequenceId != null) return false;
- }
-
- return rmSource == other.rmSource;
- }
- public int hashCode() {
- int result = 1;
- if(sequenceId != null) result = sequenceId.hashCode();
- if(rmSource) result = -result;
- return result;
- }
- }
}
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java?view=auto&rev=507936
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java Thu Feb 15 06:20:22 2007
@@ -0,0 +1,59 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.workers;
+
+import java.io.Serializable;
+
+public class SequenceEntry implements Serializable {
+ private static final long serialVersionUID = -6823171634616402792L;
+
+ private String sequenceId;
+ private boolean rmSource;
+
+ public SequenceEntry(String sequenceId, boolean rmSource) {
+ this.sequenceId = sequenceId;
+ this.rmSource = rmSource;
+ }
+ public boolean isRmSource() {
+ return rmSource;
+ }
+ public String getSequenceId() {
+ return sequenceId;
+ }
+
+
+ public boolean equals(Object o) {
+ if(o == null) return false;
+ if(o == this) return true;
+ if(o.getClass() != getClass()) return false;
+
+ SequenceEntry other = (SequenceEntry) o;
+ if(sequenceId != null) {
+ if(!sequenceId.equals(other.sequenceId)) return false;
+ } else {
+ if(other.sequenceId != null) return false;
+ }
+
+ return rmSource == other.rmSource;
+ }
+ public int hashCode() {
+ int result = 1;
+ if(sequenceId != null) result = sequenceId.hashCode();
+ if(rmSource) result = -result;
+ return result;
+ }
+}
Propchange: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SequenceEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org