You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2006/12/12 10:24:29 UTC
svn commit: r486070 - in /webservices/sandesha/trunk/java: config/
src/org/apache/sandesha2/handlers/ src/org/apache/sandesha2/i18n/
src/org/apache/sandesha2/msgprocessors/ src/org/apache/sandesha2/polling/
src/org/apache/sandesha2/storage/beans/ src/o...
Author: mlovett
Date: Tue Dec 12 01:24:28 2006
New Revision: 486070
URL: http://svn.apache.org/viewvc?view=rev&rev=486070
Log:
Sync 2-way for WSRM 1.1, see SANDESHA2-62
Added:
webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java (with props)
Modified:
webservices/sandesha/trunk/java/config/module.xml
webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
Modified: webservices/sandesha/trunk/java/config/module.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/config/module.xml?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/config/module.xml (original)
+++ webservices/sandesha/trunk/java/config/module.xml Tue Dec 12 01:24:28 2006
@@ -48,7 +48,6 @@
<!-- namespaces for the 2006-08 spec -->
<actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/SequenceAcknowledgement</actionMapping>
<actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/AckRequested</actionMapping>
- <actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/MakeConnection</actionMapping>
</operation>
<operation name="RMInOutOperation" mep="http://www.w3.org/2004/08/wsdl/in-out">
@@ -61,6 +60,7 @@
<actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/CreateSequence</actionMapping>
<actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/CloseSequence</actionMapping>
<actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/TerminateSequence</actionMapping>
+ <actionMapping>http://docs.oasis-open.org/ws-rx/wsrm/200608/MakeConnection</actionMapping>
</operation>
<operation name="RMOutOnlyOperation" mep="http://www.w3.org/2004/08/wsdl/out-only">
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java Tue Dec 12 01:24:28 2006
@@ -124,7 +124,7 @@
// Dropping duplicates
boolean dropped = dropIfDuplicate(rmMessageContext, storageManager);
if (dropped) {
- returnValue = InvocationResponse.SUSPEND; //the msg has been paused
+ returnValue = InvocationResponse.ABORT; //the msg has been dropped
processDroppedMessage(rmMessageContext, storageManager);
if (log.isDebugEnabled())
log.debug("Exit: SandeshaGlobalInHandler::invoke, dropped " + returnValue);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Tue Dec 12 01:24:28 2006
@@ -257,7 +257,6 @@
public final static String elementMustForSpec = "elementMustForSpec";
public final static String addressingNamespaceNotSet = "addressingNamespaceNotSet";
public final static String couldNotSendCreateSeqResponse = "couldNotSendCreateSeqResponse";
- public final static String invalidOfferNoResponseEndpoint = "invalidOfferNoResponseEndpoint";
public final static String invalidElementFoundWithinElement = "invalidElementFoundWithinElement";
public final static String invokerNotFound="invokerNotFound";
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Tue Dec 12 01:24:28 2006
@@ -273,7 +273,6 @@
toBeanNotSet=The 'To' Sequence Property Bean has not been set for the sequence.
replyToBeanNotSet=The 'ReplyTo' Sequence Property Bean has not been set for the sequence.
cannotFindTransportInDesc=Cannot find the transport in description {0} in the ConfigurationContext
-invalidOfferNoResponseEndpoint=Cannot derive a valid offer from the given infomation. No Endpoint for receiving messages.
invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}'' element
invokerNotFound=An invoker thread was not found to dispatch messages on the inbound sequence {0}.
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Tue Dec 12 01:24:28 2006
@@ -437,23 +437,6 @@
rmMsgCtx.setMessageId(messageId1);
}
- if (serverSide) {
- // let the request end with 202 if a ack has not been
- // written in the incoming thread.
-
- MessageContext reqMsgCtx = null;
- try {
- reqMsgCtx = msgContext.getOperationContext().getMessageContext(
- WSDLConstants.MESSAGE_LABEL_IN_VALUE);
- } catch (AxisFault e) {
- throw new SandeshaException(e);
- }
-
- if (reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN) == null
- || !"true".equals(reqMsgCtx.getProperty(Sandesha2Constants.ACK_WRITTEN)))
- reqMsgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
- }
-
EndpointReference toEPR = msgContext.getTo();
if (toEPR == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.toEPRNotValid, null);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Tue Dec 12 01:24:28 2006
@@ -8,7 +8,10 @@
import org.apache.axis2.Constants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.TransportOutDescription;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
@@ -33,13 +36,16 @@
*/
public class MakeConnectionProcessor implements MsgProcessor {
+ private static final Log log = LogFactory.getLog(MakeConnectionProcessor.class);
+
/**
* Prosesses incoming MakeConnection request messages.
* A message is selected by the set of SenderBeans that are waiting to be sent.
* This is processed using a SenderWorker.
*/
public boolean processInMessage(RMMsgContext rmMsgCtx) throws AxisFault {
-
+ if(log.isDebugEnabled()) log.debug("Entry: MakeConnectionProcessor::processInMessage");
+
MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
Address address = makeConnection.getAddress();
Identifier identifier = makeConnection.getIdentifier();
@@ -54,24 +60,16 @@
findSenderBean.setSend(true);
if (address!=null)
- findSenderBean.setWsrmAnonURI(address.getAddress());
+ findSenderBean.setToAddress(address.getAddress());
if (identifier!=null)
findSenderBean.setSequenceID(identifier.getIdentifier());
//finding the beans that go with the criteria of the passed SenderBean
-
- //beans with reSend=true
- findSenderBean.setReSend(true);
+ //The reSend flag is ignored for this selection, so there is no need to
+ //set it.
Collection collection = senderBeanMgr.find(findSenderBean);
- //beans with reSend=false
- findSenderBean.setReSend (false);
- Collection collection2 = senderBeanMgr.find(findSenderBean);
-
- //all possible beans
- collection.addAll(collection2);
-
//selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
int size = collection.size();
int itemToPick=-1;
@@ -90,14 +88,15 @@
SenderBean senderBean = null;
for (int item=0;item<size;item++) {
-
senderBean = (SenderBean) it.next();
if (item==itemToPick)
break;
}
- if (senderBean==null)
+ if (senderBean==null) {
+ if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message found");
return false;
+ }
TransportOutDescription transportOut = rmMsgCtx.getMessageContext().getTransportOut();
if (transportOut==null) {
@@ -110,15 +109,17 @@
MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,configurationContext);
RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
-
- addMessagePendingHeader (returnRMMsg,pending);
+ if(pending) addMessagePendingHeader (returnRMMsg,pending);
setTransportProperties (returnMessage, rmMsgCtx);
- //setting that the response gets written written.
- //This will be used by transports. For e.g. CommonsHTTPTransportSender will send 200 OK, instead of 202.
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN , Constants.VALUE_TRUE);
+ // Link the response to the request
+ OperationContext context = rmMsgCtx.getMessageContext().getOperationContext();
+ context.addMessageContext(returnMessage);
+ returnMessage.setOperationContext(context);
+ // Store the response again
+ storageManager.updateMessageContext(messageStorageKey, returnMessage);
//running the MakeConnection through a SenderWorker.
//This will allow Sandesha2 to consider both of following senarios equally.
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Tue Dec 12 01:24:28 2006
@@ -119,13 +119,6 @@
secManager.checkProofOfPossession(token, body, msgCtx);
}
- //RM will not send sync responses. If sync acks are there this will be
- // made true again later.
- if (rmMsgCtx.getMessageContext().getOperationContext() != null) {
- rmMsgCtx.getMessageContext().getOperationContext().setProperty(Constants.RESPONSE_WRITTEN,
- Constants.VALUE_FALSE);
- }
-
// setting acked msg no range
ConfigurationContext configCtx = rmMsgCtx.getMessageContext().getConfigurationContext();
if (configCtx == null) {
@@ -303,10 +296,18 @@
RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, sequencePropertyKey, sequenceId, storageManager);
MessageContext ackMsgCtx = ackRMMsgCtx.getMessageContext();
- EndpointReference acksTo = ackRMMsgCtx.getTo();
-
- if (SandeshaUtil.isAnonymousURI (acksTo.getAddress())) {
+ boolean anonAck = ackRMMsgCtx.getTo().hasAnonymousAddress();
+ EndpointReference replyTo = rmMsgCtx.getReplyTo();
+ // Only use the backchannel for ack messages if we are sure that the application
+ // doesn't need it. A 1-way MEP should be complete by now.
+ boolean complete = ackMsgCtx.getOperationContext().isComplete();
+ if (anonAck && !complete && (replyTo == null || replyTo.hasAnonymousAddress())) {
+ if (log.isDebugEnabled()) log.debug("Exit: SequenceProcessor::sendAckIfNeeded, avoiding using backchannel");
+ return;
+ }
+
+ if(anonAck) {
// setting CONTEXT_WRITTEN since acksto is anonymous
if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
// operation context will be null when doing in a GLOBAL
@@ -326,7 +327,7 @@
AxisEngine engine = new AxisEngine(configCtx);
engine.send(ackRMMsgCtx.getMessageContext());
- } else {
+ } else if(!anonAck) {
// / Transaction asyncAckTransaction =
// storageManager.getTransaction();
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Tue Dec 12 01:24:28 2006
@@ -17,7 +17,6 @@
package org.apache.sandesha2.polling;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
@@ -26,17 +25,15 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.TransportOutDescription;
-import org.apache.axis2.engine.AxisEngine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
-import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
-import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
-import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.MsgInitializer;
@@ -48,6 +45,7 @@
* keeps running. Will do MakeConnection based on the request queue or randomly.
*/
public class PollingManager extends Thread {
+ private static final Log log = LogFactory.getLog(PollingManager.class);
private ConfigurationContext configurationContext = null;
private StorageManager storageManager = null;
@@ -60,112 +58,112 @@
private final int POLLING_MANAGER_WAIT_TIME = 3000;
public void run() {
-
-
while (isPoll()) {
-
+ Transaction t = null;
try {
-
- RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
-
- //geting the sequences to be polled.
- //if shedule contains any requests, do the earliest one.
- //else pick one randomly.
-
- String sequenceId = getNextSheduleEntry ();
-
- RMDBean nextMsgBean = null;
-
- if (sequenceId==null) {
-
- RMDBean findBean = new RMDBean ();
- findBean.setPollingMode(true);
-
- List results = nextMsgMgr.find(findBean);
- int size = results.size();
- if (size>0) {
- Random random = new Random ();
- int item = random.nextInt(size);
- nextMsgBean = (RMDBean) results.get(item);
+ t = storageManager.getTransaction();
+ internalRun();
+ t.commit();
+ t = null;
+ } catch (Exception e) {
+ if(log.isDebugEnabled()) log.debug("Exception", e);
+ if(t != null) {
+ try {
+ t.rollback();
+ } catch(Exception e2) {
+ if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
}
-
-
-
- } else {
- RMDBean findBean = new RMDBean ();
- findBean.setPollingMode(true);
- findBean.setSequenceID(sequenceId);
-
- nextMsgBean = nextMsgMgr.findUnique(findBean);
- }
-
- //If not valid entry is found, try again later.
- if (nextMsgBean==null)
- continue;
-
- sequenceId = nextMsgBean.getSequenceID();
-
- //create a MakeConnection message
- String referenceMsgKey = nextMsgBean.getReferenceMessageKey();
-
- String sequencePropertyKey = sequenceId;
- String replyTo = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR,storageManager);
- String WSRMAnonReplyToURI = null;
- if (SandeshaUtil.isWSRMAnonymousReplyTo(replyTo))
- WSRMAnonReplyToURI = replyTo;
-
- MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,configurationContext);
- RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
- RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
- sequenceId , WSRMAnonReplyToURI,storageManager);
-
- makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
- //storing the MakeConnection message.
- String makeConnectionMsgStoreKey = SandeshaUtil.getUUID();
-
- makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
- sequencePropertyKey);
-
- //add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
- SenderBean makeConnectionSenderBean = new SenderBean ();
-// makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
- makeConnectionSenderBean.setMessageContextRefKey(makeConnectionMsgStoreKey);
- makeConnectionSenderBean.setMessageID(makeConnectionRMMessage.getMessageId());
- makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
- makeConnectionSenderBean.setReSend(false);
- makeConnectionSenderBean.setSend(true);
- makeConnectionSenderBean.setSequenceID(sequenceId);
- EndpointReference to = makeConnectionRMMessage.getTo();
- if (to!=null)
- makeConnectionSenderBean.setToAddress(to.getAddress());
-
- SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
-
- //this message should not be sent until it is qualified. I.e. till it is sent through the Sandesha2TransportSender.
- makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
-
- SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey);
-
- senderBeanMgr.insert(makeConnectionSenderBean);
- } catch (SandeshaStorageException e) {
- e.printStackTrace();
- } catch (SandeshaException e) {
- e.printStackTrace();
- } catch (AxisFault e) {
- e.printStackTrace();
- } finally {
- try {
- Thread.sleep(POLLING_MANAGER_WAIT_TIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ t = null;
}
}
-
+ try {
+ Thread.sleep(POLLING_MANAGER_WAIT_TIME);
+ } catch (InterruptedException e) {
+ if(log.isDebugEnabled()) log.debug("Sleep was interrupted", e);
+ }
}
}
+ private void internalRun() throws AxisFault {
+ RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
+
+ //geting the sequences to be polled.
+ //if shedule contains any requests, do the earliest one.
+ //else pick one randomly.
+
+ String sequenceId = getNextSheduleEntry ();
+ RMDBean nextMsgBean = null;
+
+ RMDBean findBean = new RMDBean();
+ findBean.setPollingMode(true);
+ findBean.setSequenceID(sequenceId); // Note that this may be null
+ List results = nextMsgMgr.find(findBean);
+ int size = results.size();
+ if (size>0) {
+ Random random = new Random ();
+ int item = random.nextInt(size);
+ nextMsgBean = (RMDBean) results.get(item);
+ }
+
+ //If not valid entry is found, try again later.
+ if (nextMsgBean==null) {
+ if(log.isDebugEnabled()) log.debug("No polling requests queued");
+ return;
+ }
+ sequenceId = nextMsgBean.getSequenceID();
+
+ if(log.isDebugEnabled()) log.debug("Polling for sequence " + sequenceId);
+
+ //create a MakeConnection message
+ String referenceMsgKey = nextMsgBean.getReferenceMessageKey();
+
+ String sequencePropertyKey = sequenceId;
+ String replyTo = SandeshaUtil.getSequenceProperty(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.REPLY_TO_EPR,storageManager);
+ String WSRMAnonReplyToURI = null;
+ if (SandeshaUtil.isWSRMAnonymousReplyTo(replyTo))
+ WSRMAnonReplyToURI = replyTo;
+
+ MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,configurationContext);
+ RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
+ RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
+ sequenceId , WSRMAnonReplyToURI,storageManager);
+
+ // Put our transaction onto the message context
+ makeConnectionRMMessage.setProperty(Sandesha2Constants.WITHIN_TRANSACTION, Sandesha2Constants.VALUE_TRUE);
+
+ makeConnectionRMMessage.setProperty(MessageContext.TRANSPORT_IN,null);
+ //storing the MakeConnection message.
+ String makeConnectionMsgStoreKey = SandeshaUtil.getUUID();
+
+ makeConnectionRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY,
+ sequencePropertyKey);
+
+ //add an entry for the MakeConnection message to the sender (with ,send=true, resend=false)
+ SenderBean makeConnectionSenderBean = new SenderBean ();
+// makeConnectionSenderBean.setInternalSequenceID(internalSequenceId);
+ makeConnectionSenderBean.setMessageContextRefKey(makeConnectionMsgStoreKey);
+ makeConnectionSenderBean.setMessageID(makeConnectionRMMessage.getMessageId());
+ makeConnectionSenderBean.setMessageType(Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG);
+ makeConnectionSenderBean.setReSend(false);
+ makeConnectionSenderBean.setSend(true);
+ makeConnectionSenderBean.setSequenceID(sequenceId);
+ EndpointReference to = makeConnectionRMMessage.getTo();
+ if (to!=null)
+ makeConnectionSenderBean.setToAddress(to.getAddress());
+
+ SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
+
+ //this message should not be sent until it is qualified. I.e. till it is sent through the Sandesha2TransportSender.
+ makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
+
+ SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey);
+
+ senderBeanMgr.insert(makeConnectionSenderBean);
+ }
+
private synchronized String getNextSheduleEntry () {
+ if(log.isDebugEnabled()) log.debug("Entry: PollingManager::getNextSheduleEntry");
String sequenceId = null;
if (sheduledPollingRequests.size()>0) {
@@ -178,6 +176,7 @@
}
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::getNextSheduleEntry, " + sequenceId);
return sequenceId;
}
@@ -188,11 +187,15 @@
* @throws SandeshaException
*/
public synchronized void start (ConfigurationContext configurationContext) throws SandeshaException {
+ if(log.isDebugEnabled()) log.debug("Entry: PollingManager::start");
+
this.configurationContext = configurationContext;
this.sheduledPollingRequests = new HashMap ();
this.storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
setPoll(true);
super.start();
+
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::start");
}
/**
@@ -200,14 +203,20 @@
*
*/
public synchronized void stopPolling () {
+ if(log.isDebugEnabled()) log.debug("Entry: PollingManager::stopPolling");
setPoll(false);
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::stopPolling");
}
public synchronized void setPoll (boolean poll) {
+ if(log.isDebugEnabled()) log.debug("Entry: PollingManager::setPoll");
this.poll = poll;
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::setPoll");
}
public synchronized boolean isPoll () {
+ if(log.isDebugEnabled()) log.debug("Entry: PollingManager::isPoll");
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::isPoll");
return poll;
}
@@ -222,8 +231,7 @@
* @param sequenceId
*/
public synchronized void shedulePollingRequest (String sequenceId) {
-
- System.out.println("Polling request sheduled for sequence:" + sequenceId);
+ if(log.isDebugEnabled()) log.debug("Entry: PollingManager::shedulePollingRequest, " + sequenceId);
if (sheduledPollingRequests.containsKey (sequenceId)) {
Integer sequenceEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
@@ -234,7 +242,6 @@
sheduledPollingRequests.put(sequenceId, sequenceEntryCount);
}
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::shedulePollingRequest");
}
-
-
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/beans/SenderBean.java Tue Dec 12 01:24:28 2006
@@ -90,11 +90,6 @@
private String sequenceID;
/**
- * TODO use the value in CreateSequenceBean.
- */
- private String wsrmAnonURI;
-
- /**
* Destination URL of the message to be sent. This can be used to decide weather the message cannot be sent,
* before actyally reading the message from the storage.
*/
@@ -194,14 +189,6 @@
public void setSequenceID(String sequenceID) {
this.sequenceID = sequenceID;
- }
-
- public String getWsrmAnonURI() {
- return wsrmAnonURI;
- }
-
- public void setWsrmAnonURI(String wsrmAnonURI) {
- this.wsrmAnonURI = wsrmAnonURI;
}
public String getToAddress() {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Tue Dec 12 01:24:28 2006
@@ -110,6 +110,13 @@
log.debug("isSend didn't match");
add = false;
}
+
+ if (bean.getToAddress() != null
+ && !bean.getToAddress().equals("")
+ && !bean.getToAddress().equals(temp.getToAddress())) {
+ log.debug("ToAddress didn't match");
+ add = false;
+ }
// Do not use the isReSend flag to match messages, as it can stop us from
// detecting RM messages during 'getNextMsgToSend'
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/AcknowledgementManager.java Tue Dec 12 01:24:28 2006
@@ -47,6 +47,7 @@
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
+import org.apache.sandesha2.wsrm.Sequence;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
/**
@@ -68,21 +69,34 @@
throws SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
-
+
ConfigurationContext configurationContext = rmMessageContext.getConfigurationContext();
-
SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
+ // If this message is going to an anonymous address then we add in an ack for the
+ // sequence that was used on the inbound side.
+ EndpointReference target = rmMessageContext.getTo();
+ if(target.hasAnonymousAddress()) {
+ Sequence sequence = (Sequence) rmMessageContext.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
+ if(sequence != null) {
+ String outboundSequenceId = sequence.getIdentifier().getIdentifier();
+ String outboundInternalSeq = SandeshaUtil.getSequenceProperty(outboundSequenceId,
+ Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID, storageManager);
+ String inboundSequenceId = SandeshaUtil.getServerSideIncomingSeqIdFromInternalSeqId(outboundInternalSeq);
+
+ if(log.isDebugEnabled()) log.debug("Piggybacking ack for " + inboundSequenceId);
+ RMMsgCreator.addAckMessage(rmMessageContext, inboundSequenceId, inboundSequenceId, storageManager);
+ }
+ if(log.isDebugEnabled()) log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent, anon");
+ return;
+ }
+
SenderBean findBean = new SenderBean();
-
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
findBean.setSend(true);
- findBean.setReSend(false);
-
- String carrietTo = rmMessageContext.getTo().getAddress();
+ findBean.setToAddress(target.getAddress());
Collection collection = retransmitterBeanMgr.find(findBean);
-
Iterator it = collection.iterator();
piggybackLoop: while (it.hasNext()) {
@@ -95,12 +109,6 @@
MessageContext ackMsgContext = storageManager.retrieveMessageContext(ackBean.getMessageContextRefKey(),
configurationContext);
-
- // wsa:To has to match for piggybacking.
- String to = ackMsgContext.getTo().getAddress();
- if (!carrietTo.equals(to)) {
- continue piggybackLoop;
- }
if (log.isDebugEnabled()) log.debug("Adding ack headers");
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/RMMsgCreator.java Tue Dec 12 01:24:28 2006
@@ -26,6 +26,7 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
@@ -134,8 +135,19 @@
if (offeredEndpoint==null) {
EndpointReference replyTo = applicationMsgContext.getReplyTo(); //using replyTo as the Endpoint if it is not specified
- if (replyTo!=null)
+ if (replyTo!=null) {
offeredEndpoint = SandeshaUtil.cloneEPR(replyTo);
+ }
+ }
+ // Finally fall back to using an anonymous endpoint
+ if (offeredEndpoint==null) {
+ String anon = null;
+ if(AddressingConstants.Final.WSA_NAMESPACE.equals(addressingNamespaceValue)) {
+ anon = AddressingConstants.Final.WSA_ANONYMOUS_URL;
+ } else {
+ anon = AddressingConstants.Submission.WSA_ANONYMOUS_URL;
+ }
+ offeredEndpoint = new EndpointReference(anon);
}
if (offeredSequence != null && !"".equals(offeredSequence)) {
SequenceOffer offerPart = new SequenceOffer(rmNamespaceValue);
@@ -145,15 +157,9 @@
createSequencePart.setSequenceOffer(offerPart);
if (Sandesha2Constants.SPEC_2006_08.NS_URI.equals(rmNamespaceValue)) {
- if (offeredEndpoint!=null) {
- Endpoint endpoint = new Endpoint (rmNamespaceValue,addressingNamespaceValue);
- endpoint.setEPR (offeredEndpoint);
- offerPart.setEndpoint(endpoint);
- } else {
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.invalidOfferNoResponseEndpoint);
- throw new SandeshaException (message);
- }
+ Endpoint endpoint = new Endpoint (rmNamespaceValue,addressingNamespaceValue);
+ endpoint.setEPR (offeredEndpoint);
+ offerPart.setEndpoint(endpoint);
}
}
}
@@ -489,7 +495,7 @@
* @throws SandeshaException
*/
public static void addAckMessage(RMMsgContext applicationMsg, String sequencePropertyKey ,String sequenceId, StorageManager storageManager)
- throws AxisFault {
+ throws SandeshaException {
if(log.isDebugEnabled())
log.debug("Entry: RMMsgCreator::addAckMessage " + sequenceId);
@@ -548,7 +554,11 @@
applicationMsg.setMessageId(SandeshaUtil.getUUID());
//generating the SOAP envelope.
- applicationMsg.addSOAPEnvelope();
+ try {
+ applicationMsg.addSOAPEnvelope();
+ } catch(AxisFault e) {
+ throw new SandeshaException(e);
+ }
// Ensure the message also contains the token that needs to be used
secureOutboundMessage(sequencePropertyKey, applicationMsg.getMessageContext());
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java Tue Dec 12 01:24:28 2006
@@ -279,13 +279,14 @@
}
} else {
+ //setting replyTo, which defaults to anonymous
+ String replyTo = anonymousURI;
EndpointReference replyToEPR = firstAplicationMsgCtx.getReplyTo();
- //setting replyTo and acksTo beans.
-
- if (replyToEPR!=null)
- replyToBean = new SequencePropertyBean(sequencePropertyKey,
- Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, replyToEPR.getAddress());
-
+ if (replyToEPR!=null) replyTo = replyToEPR.getAddress();
+
+ replyToBean = new SequencePropertyBean(sequencePropertyKey,
+ Sandesha2Constants.SequenceProperties.REPLY_TO_EPR, replyTo);
+
//TODO set AcksToBean.
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SpecSpecificConstants.java Tue Dec 12 01:24:28 2006
@@ -351,11 +351,11 @@
case Sandesha2Constants.MessageTypes.CREATE_SEQ:
case Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE:
case Sandesha2Constants.MessageTypes.TERMINATE_SEQ:
+ case Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG:
result = service.getOperation(new QName("RMOutInOperation"));
break;
case Sandesha2Constants.MessageTypes.ACK:
case Sandesha2Constants.MessageTypes.ACK_REQUEST:
- case Sandesha2Constants.MessageTypes.MAKE_CONNECTION_MSG:
result = service.getOperation(new QName("RMOutOnlyOperation"));
break;
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Tue Dec 12 01:24:28 2006
@@ -17,7 +17,6 @@
package org.apache.sandesha2.workers;
-import org.apache.axis2.addressing.AddressingConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
@@ -98,31 +97,6 @@
log.debug(message);
}
continue;
- }
-
- String toAddress = senderBean.getToAddress();
- if (toAddress != null) {
- boolean unsendableAddress = false;
-
- if (toAddress
- .equals(AddressingConstants.Submission.WSA_ANONYMOUS_URL))
- unsendableAddress = true;
- else if (toAddress
- .equals(AddressingConstants.Final.WSA_ANONYMOUS_URL))
- unsendableAddress = true;
- else if (toAddress
- .startsWith(Sandesha2Constants.WSRM_ANONYMOUS_URI_PREFIX))
- unsendableAddress = true;
-
- if (unsendableAddress) {
- if (log.isDebugEnabled()) {
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.cannotSendToTheAddress,
- toAddress);
- log.debug(message);
- }
- continue;
- }
}
// work Id is used to define the piece of work that will be
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=486070&r1=486069&r2=486070
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SenderWorker.java Tue Dec 12 01:24:28 2006
@@ -6,11 +6,15 @@
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axiom.soap.SOAPFault;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.commons.logging.Log;
@@ -113,6 +117,23 @@
return;
}
+ // If we are sending to the anonymous URI then we _must_ have a transport waiting,
+ // or the message can't go anywhere. If there is nothing here then we leave the
+ // message in the sender queue, and a MakeConnection will hopefully pick it up
+ // soon.
+ EndpointReference toEPR = msgCtx.getTo();
+ if(toEPR.hasAnonymousAddress()) {
+ RequestResponseTransport t = null;
+ MessageContext inMsg = null;
+ OperationContext op = msgCtx.getOperationContext();
+ if(op != null) inMsg = op.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if(inMsg != null) t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+ if(t == null) {
+ if(log.isDebugEnabled()) log.debug("Exit: SenderWorker::run, no response transport for anonymous message");
+ return;
+ }
+ }
+
updateMessage(msgCtx);
int messageType = senderBean.getMessageType();
@@ -142,7 +163,6 @@
if (transportOut!=null)
msgCtx.setTransportOut(transportOut);
-
boolean successfullySent = false;
@@ -176,7 +196,19 @@
AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
if (log.isDebugEnabled())
log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
- engine.resumeSend(msgCtx);
+ InvocationResponse response = engine.resumeSend(msgCtx);
+ if(log.isDebugEnabled()) log.debug("Engine resume returned " + response);
+ if(response != InvocationResponse.SUSPEND) {
+ RequestResponseTransport t = null;
+ MessageContext inMsg = null;
+ OperationContext op = msgCtx.getOperationContext();
+ if(op != null) inMsg = op.getMessageContext(OperationContextFactory.MESSAGE_LABEL_IN_VALUE);
+ if(inMsg != null) t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+ if(t != null) {
+ if(log.isDebugEnabled()) log.debug("Signalling transport in " + t);
+ if(t != null) t.signalResponseReady();
+ }
+ }
successfullySent = true;
} catch (Exception e) {
Added: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java?view=auto&rev=486070
==============================================================================
--- webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java (added)
+++ webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java Tue Dec 12 01:24:28 2006
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.sandesha2.scenarios;
+
+import java.io.File;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.Constants;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.SandeshaTestCase;
+import org.apache.sandesha2.client.SandeshaClient;
+import org.apache.sandesha2.client.SandeshaClientConstants;
+import org.apache.sandesha2.client.SequenceReport;
+import org.apache.sandesha2.util.SandeshaUtil;
+
+/**
+ * This testcase is similar to the AnonymousAckEchoTest, but the replyTo EPR
+ * is also anonymous, so all the server->client message flows use the HTTP
+ * backchannel.
+ */
+public class AnonymousEchoTest extends SandeshaTestCase {
+
+ public AnonymousEchoTest () {
+ super ("AnonymousEchoTest");
+ }
+
+ public void setUp () throws Exception {
+ super.setUp();
+ String repoPath = "target" + File.separator + "repos" + File.separator + "server";
+ String axis2_xml = repoPath + File.separator + "server_axis2.xml";
+ startServer(repoPath, axis2_xml);
+ }
+
+ public void testSyncEcho () throws Exception {
+ String to = "http://127.0.0.1:" + serverPort + "/axis2/services/RMSampleService";
+
+ String repoPath = "target" + File.separator + "repos" + File.separator + "client";
+ String axis2_xml = repoPath + File.separator + "client_axis2.xml";
+ ConfigurationContext configContext = ConfigurationContextFactory.createConfigurationContextFromFileSystem(repoPath,axis2_xml);
+
+ Options clientOptions = new Options ();
+ clientOptions.setAction(echoAction);
+ clientOptions.setTo(new EndpointReference (to));
+ String sequenceKey = SandeshaUtil.getUUID();
+ String offeredSequenceID = SandeshaUtil.getUUID();
+ clientOptions.setProperty(SandeshaClientConstants.LAST_MESSAGE, "true");
+ clientOptions.setProperty(SandeshaClientConstants.SEQUENCE_KEY,sequenceKey);
+ clientOptions.setProperty(SandeshaClientConstants.OFFERED_SEQUENCE_ID,offeredSequenceID);
+ clientOptions.setProperty(SandeshaClientConstants.RM_SPEC_VERSION,Sandesha2Constants.SPEC_VERSIONS.v1_1);
+ clientOptions.setTransportInProtocol(Constants.TRANSPORT_HTTP);
+
+ // Put in enough config to convince Axis that this is async, without setting up
+ // new WS-Adressing replyTo etc.
+ clientOptions.setUseSeparateListener(true);
+ clientOptions.setProperty(Constants.Configuration.USE_CUSTOM_LISTENER,Boolean.TRUE);
+
+ ServiceClient serviceClient = new ServiceClient (configContext,null);
+ serviceClient.setOptions(clientOptions);
+
+ OMElement reply = serviceClient.sendReceive(getEchoOMBlock("echo1",sequenceKey));
+ String replyText = checkEchoOMBlock(reply);
+ assertEquals("echo1", replyText);
+
+ long limit = System.currentTimeMillis() + waitTime;
+ Error lastError = null;
+ while(System.currentTimeMillis() < limit) {
+ Thread.sleep(tickTime); // Try the assertions each tick interval, until they pass or we time out
+
+ try {
+ //assertions for the out sequence.
+ SequenceReport sequenceReport = SandeshaClient.getOutgoingSequenceReport(serviceClient);
+ assertTrue(sequenceReport.getCompletedMessages().contains(new Long(1)));
+ assertEquals(SequenceReport.SEQUENCE_DIRECTION_OUT, sequenceReport.getSequenceDirection());
+ assertEquals(SequenceReport.SEQUENCE_STATUS_TERMINATED, sequenceReport.getSequenceStatus());
+
+ //assertions for the in sequence
+ sequenceReport = SandeshaClient.getIncomingSequenceReport(offeredSequenceID, configContext);
+ assertTrue(sequenceReport.getCompletedMessages().contains(new Long(1)));
+ assertEquals(SequenceReport.SEQUENCE_DIRECTION_IN, sequenceReport.getSequenceDirection());
+ assertEquals(SequenceReport.SEQUENCE_STATUS_TERMINATED, sequenceReport.getSequenceStatus());
+
+ lastError = null;
+ break;
+ } catch(Error e) {
+ lastError = e;
+ }
+ }
+
+ if(lastError != null) throw lastError;
+
+ configContext.getListenerManager().stop();
+ serviceClient.cleanup();
+ }
+
+}
Propchange: webservices/sandesha/trunk/java/test/src/org/apache/sandesha2/scenarios/AnonymousEchoTest.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org