You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2007/02/28 12:23:00 UTC
svn commit: r512708 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2:
Sandesha2Constants.java msgprocessors/ApplicationMsgProcessor.java
polling/PollingManager.java storage/beans/RMSBean.java
util/SandeshaUtil.java util/SequenceManager.java
Author: mlovett
Date: Wed Feb 28 03:22:57 2007
New Revision: 512708
URL: http://svn.apache.org/viewvc?view=rev&rev=512708
Log:
Put the uuid associated with an anonymous endpoint into the RMSBean, instead of on the Service Context
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Wed Feb 28 03:22:57 2007
@@ -511,8 +511,6 @@
static final String RETRANSMITTABLE_PHASES = "RMRetransmittablePhases";
- static final String RM_ANON_UUID = "RMAnonymousUUID";
-
static final String propertiesToCopyFromReferenceMessage = "propertiesToCopyFromReferenceMessage";
static final String propertiesToCopyFromReferenceRequestMessage = "propertiesToCopyFromReferenceRequestMessage";
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Wed Feb 28 03:22:57 2007
@@ -86,18 +86,6 @@
MessageContext msgContext = rmMsgCtx.getMessageContext();
ConfigurationContext configContext = msgContext.getConfigurationContext();
- // Re-write the WS-A anonymous URI, if we support the RM anonymous URI. We only
- // need to rewrite the replyTo EPR if we have an out-in MEP.
- AxisOperation op = msgContext.getAxisOperation();
- int mep = WSDLConstants.MEP_CONSTANT_INVALID;
- if(op != null) {
- mep = op.getAxisSpecifMEPConstant();
- if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
- EndpointReference replyTo = SandeshaUtil.rewriteEPR(msgContext.getReplyTo(), msgContext);
- msgContext.setReplyTo(replyTo);
- }
- }
-
// setting the Fault callback
SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
SandeshaClientConstants.SANDESHA_LISTENER);
@@ -273,11 +261,36 @@
rmsBean.setHighestOutMessageNumber(messageNumber);
// saving the used message number, and the expected reply count
+ boolean startPolling = false;
if (!dummyMessage) {
rmsBean.setNextMessageNumber(messageNumber);
+
+ // Identify the MEP associated with the message.
+ AxisOperation op = msgContext.getAxisOperation();
+ int mep = WSDLConstants.MEP_CONSTANT_INVALID;
+ if(op != null) {
+ mep = op.getAxisSpecifMEPConstant();
+ }
+
if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
long expectedReplies = rmsBean.getExpectedReplies();
rmsBean.setExpectedReplies(expectedReplies + 1);
+
+ // If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
+ EndpointReference oldEndpoint = msgContext.getReplyTo();
+ String oldAddress = (oldEndpoint == null) ? null : oldEndpoint.getAddress();
+ EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext.getReplyTo(), configContext);
+ String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
+ if(newAddress != null && !newAddress.equals(oldAddress)) {
+ // We have rewritten the replyTo. If this is the first message that we have needed to
+ // rewrite then we should set the sequence up for polling, and once we have saved the
+ // changes to the sequence then we can start the polling thread.
+ msgContext.setReplyTo(newReplyTo);
+ if(!rmsBean.isPollingMode()) {
+ rmsBean.setPollingMode(true);
+ startPolling = true;
+ }
+ }
}
}
@@ -295,6 +308,10 @@
// Update the rmsBean
storageManager.getRMSBeanMgr().update(rmsBean);
+
+ if(startPolling) {
+ SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
+ }
SOAPEnvelope env = rmMsgCtx.getSOAPEnvelope();
if (env == null) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Wed Feb 28 03:22:57 2007
@@ -140,7 +140,7 @@
boolean cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
long repliesExpected = beanToPoll.getExpectedReplies();
if(force || !cleanAcks || repliesExpected > 0)
- pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
+ pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
}
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
@@ -174,31 +174,35 @@
}
}
if(force || doPoll)
- pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
+ pollForSequence(null, null, nextMsgBean.getReferenceMessageKey(), nextMsgBean, entry);
}
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide");
}
- private void pollForSequence(String sequenceId,
- String sequencePropertyKey,
+ private void pollForSequence(String anonUUID, // Only set for RMS polling
+ String internalSeqId, // Only set for RMS polling
String referenceMsgKey,
RMSequenceBean rmBean,
SequenceEntry entry)
throws SandeshaException, SandeshaStorageException, AxisFault
{
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollForSequence, " + sequenceId + ", " + sequencePropertyKey + ", " + referenceMsgKey + ", " + rmBean);
+ if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollForSequence, rmBean: " + rmBean);
//create a MakeConnection message
String replyTo = rmBean.getReplyToEPR();
String wireSeqId = null;
String wireAddress = null;
- if (SandeshaUtil.isWSRMAnonymous(replyTo)) {
+ if(anonUUID != null) {
+ // If we are polling on a RM anon URI then we don't want to include the sequence id
+ // in the MakeConnection message.
+ wireAddress = anonUUID;
+ } else if(SandeshaUtil.isWSRMAnonymous(replyTo)) {
// If we are polling on a RM anon URI then we don't want to include the sequence id
// in the MakeConnection message.
wireAddress = replyTo;
} else {
- wireSeqId = sequenceId;
+ wireSeqId = rmBean.getSequenceID();
}
MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
@@ -218,7 +222,7 @@
//add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
SenderBean makeConnectionSenderBean = new SenderBean ();
- makeConnectionSenderBean.setInternalSequenceID((rmBean instanceof RMSBean) ? sequencePropertyKey : null); // We only have internal ids for the RMS-side
+ makeConnectionSenderBean.setInternalSequenceID(internalSeqId);
makeConnectionSenderBean.setMessageContextRefKey(makeConnectionMsgStoreKey);
makeConnectionSenderBean.setMessageID(makeConnectionRMMessage.getMessageId());
makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/RMSBean.java Wed Feb 28 03:22:57 2007
@@ -80,6 +80,8 @@
private String offeredSequence = null;
+ private String anonymousUUID = null;
+
/**
* This is the timestamp of when the last error occured when sending
*/
@@ -325,6 +327,14 @@
this.rmsFlags |= EXPECTED_REPLIES;
}
+ public String getAnonymousUUID() {
+ return anonymousUUID;
+ }
+
+ public void setAnonymousUUID(String anonymousUUID) {
+ this.anonymousUUID = anonymousUUID;
+ }
+
public String toString() {
StringBuffer result = new StringBuffer();
result.append(this.getClass().getName());
@@ -351,6 +361,7 @@
result.append("\nLastErrorTime : "); result.append(lastSendErrorTimestamp);
}
result.append("\nClientCompletedMsgs: "); result.append(clientCompletedMessages);
+ result.append("\nAnonymous UUID : "); result.append(anonymousUUID);
return result.toString();
}
@@ -392,6 +403,9 @@
else if(bean.getOfferedSequence() != null && !bean.getOfferedSequence().equals(this.getOfferedSequence()))
match = false;
+ else if(bean.getAnonymousUUID() != null && !bean.getAnonymousUUID().equals(this.getAnonymousUUID()))
+ match = false;
+
// Avoid matching on the error information
// else if((bean.rmsFlags & LAST_SEND_ERROR_TIME_FLAG) != 0 && bean.getLastSendErrorTimestamp() != this.getLastSendErrorTimestamp())
// match = false;
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Wed Feb 28 03:22:57 2007
@@ -947,13 +947,12 @@
return stackTrace;
}
- public static EndpointReference rewriteEPR(EndpointReference epr, MessageContext mc)
+ public static EndpointReference rewriteEPR(RMSBean sourceBean, EndpointReference epr, ConfigurationContext configContext)
throws SandeshaException
{
if (log.isDebugEnabled())
log.debug("Enter: SandeshaUtil::rewriteEPR " + epr);
- ConfigurationContext configContext = mc.getConfigurationContext();
SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration());
if(!policy.isEnableRMAnonURI()) {
if (log.isDebugEnabled())
@@ -969,24 +968,16 @@
if(address == null ||
AddressingConstants.Final.WSA_ANONYMOUS_URL.equals(address) ||
AddressingConstants.Submission.WSA_ANONYMOUS_URL.equals(address)) {
- // We use the service context to co-ordinate the RM anon uuid, so that several
- // invocations of the same target will yield stable replyTo addresses.
- String uuid = null;
- ServiceContext sc = mc.getServiceContext();
- if(sc == null) {
- String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.serviceContextNotSet);
- throw new SandeshaException(msg);
- }
- synchronized (sc) {
- uuid = (String) sc.getProperty(Sandesha2Constants.RM_ANON_UUID);
- if(uuid == null) {
- uuid = SandeshaUtil.getUUID();
- sc.setProperty(Sandesha2Constants.RM_ANON_UUID, uuid);
- }
+ // We use the sequence to hold the anonymous uuid, so that messages assigned to the
+ // sequence will use the same UUID to identify themselves
+ String uuid = sourceBean.getAnonymousUUID();
+ if(uuid == null) {
+ uuid = Sandesha2Constants.SPEC_2007_02.ANONYMOUS_URI_PREFIX + SandeshaUtil.getUUID();
+ sourceBean.setAnonymousUUID(uuid);
}
- if(log.isDebugEnabled()) log.debug("Rewriting EPR with UUID " + uuid);
- epr.setAddress(Sandesha2Constants.SPEC_2007_02.ANONYMOUS_URI_PREFIX + uuid);
+ if(log.isDebugEnabled()) log.debug("Rewriting EPR with anon URI " + uuid);
+ epr.setAddress(uuid);
}
if (log.isDebugEnabled())
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=512708&r1=512707&r2=512708
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Wed Feb 28 03:22:57 2007
@@ -20,6 +20,7 @@
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.Parameter;
import org.apache.commons.logging.Log;
@@ -253,8 +254,9 @@
}
}
// In case either of the replyTo or AcksTo is anonymous, rewrite them using the AnonURI template
- replyToEPR = SandeshaUtil.rewriteEPR(replyToEPR, firstAplicationMsgCtx);
- acksToEPR = SandeshaUtil.rewriteEPR(acksToEPR, firstAplicationMsgCtx);
+ ConfigurationContext config = firstAplicationMsgCtx.getConfigurationContext();
+ replyToEPR = SandeshaUtil.rewriteEPR(rmsBean, replyToEPR, config);
+ acksToEPR = SandeshaUtil.rewriteEPR(rmsBean, acksToEPR, config);
// Store both the acksTo and replyTo
if(replyToEPR != null) rmsBean.setReplyToEPR(replyToEPR.getAddress());
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org