You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2007/06/07 16:51:47 UTC
svn commit: r545205 - in
/webservices/sandesha/branches/sandesha2/java/temp/post_1_2: config/
modules/core/src/main/java/org/apache/sandesha2/
modules/core/src/main/java/org/apache/sandesha2/client/
modules/core/src/main/java/org/apache/sandesha2/msgpr...
Author: chamikara
Date: Thu Jun 7 07:51:45 2007
New Revision: 545205
URL: http://svn.apache.org/viewvc?view=rev&rev=545205
Log:
Some temporary changes
Modified:
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/config/log4j.properties
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/policy/SandeshaPolicyBean.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/mar/module.xml
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/config/log4j.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/config/log4j.properties?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/config/log4j.properties (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/config/log4j.properties Thu Jun 7 07:51:45 2007
@@ -8,7 +8,7 @@
# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.Threshold=DEBUG
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=- %m%n
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java Thu Jun 7 07:51:45 2007
@@ -561,7 +561,12 @@
static final QName RM_IN_OUT_OPERATION = new QName(SANDESHA_OP_PREFIX + "InOut");
static final QName RM_IN_ONLY_OPERATION = new QName(SANDESHA_OP_PREFIX + "InOnly");
+ static final String MESSAGE_PROCESSED = "MessageProcessed";
+
static final String OUT_LAST_MESSAGE = "OutLastMessage";
+
+ static final String TRANSOPORT_USED = "TransportUsed";
+
static final String [] SPEC_NS_URIS = {
SPEC_2005_02.NS_URI,
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java Thu Jun 7 07:51:45 2007
@@ -72,6 +72,8 @@
private static final Log log = LogFactory.getLog(SandeshaClient.class);
+ private static ArrayList listeners = new ArrayList ();
+
/**
* Users can get a SequenceReport of the sequence defined by the information
* given from the passed serviceClient object.
@@ -1408,6 +1410,14 @@
} catch (AxisFault e) {
throw new SandeshaException (e);
}
+ }
+
+ public static synchronized void addSandeshaListener (SandeshaListener listener) {
+ listeners.add(listener);
+ }
+
+ public static synchronized Iterator getListeners () {
+ return listeners.iterator();
}
}
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java Thu Jun 7 07:51:45 2007
@@ -91,6 +91,8 @@
outMessageContext.setProperty (Constants.OUT_TRANSPORT_INFO, msgContext.getProperty(Constants.OUT_TRANSPORT_INFO));
outMessageContext.setProperty (MessageContext.TRANSPORT_OUT, msgContext.getProperty(MessageContext.TRANSPORT_OUT));
+ outMessageContext.setProperty(Sandesha2Constants.TRANSOPORT_USED, Boolean.FALSE);
+
//add the SOAP envelope with body null
SOAPFactory factory = (SOAPFactory) msgContext.getEnvelope().getOMFactory();
SOAPEnvelope envelope = factory.getDefaultEnvelope();
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Thu Jun 7 07:51:45 2007
@@ -1,7 +1,9 @@
package org.apache.sandesha2.msgprocessors;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import org.apache.axis2.AxisFault;
@@ -148,6 +150,31 @@
// Someone else has either removed the sender & message, or another make connection got here first.
return;
}
+
+ RequestResponseTransport transportControl = (RequestResponseTransport) returnMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ //if transport control is not null, this means that this response already belongs to a request message.
+ //So we need to clone it an
+// if (transportControl!=null) {
+// returnMessage = SandeshaUtil.cloneMessageContext(returnMessage);
+// }
+
+ returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
+
+
+// if (transportControl==null)
+// else {
+//// //since there already is a transportControl we add both to a transport control Map
+//// List transportControlList = (List) returnMessage.getProperty(Sandesha2Constants.TRANSPORT_CONTROL_LIST);
+//// if (transportControlList==null) {
+//// transportControlList = new ArrayList ();
+//// returnMessage.setProperty(Sandesha2Constants.TRANSPORT_CONTROL_LIST, transportControlList);
+//// transportControlList.add(transportControl);
+//// returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, null);
+//// }
+//
+//
+// }
if(pending) addMessagePendingHeader(returnMessage, makeConnectionNamespace);
@@ -186,10 +213,6 @@
returnMessage.setOperationContext(context);
returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
- returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
-
- //marking pollMessage as responsed
- pollMessage.getMessageContext().getOperationContext().setProperty (Constants.RESPONSE_WRITTEN,Constants.VALUE_TRUE);
// Commit the current transaction, so that the SenderWorker can do it's own locking
if(transaction != null && transaction.isActive()) transaction.commit();
@@ -201,7 +224,6 @@
SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
worker.setMessage(returnRMMsg);
worker.run();
-
if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
}
@@ -224,5 +246,7 @@
returnMessage.setProperty(Constants.Configuration.CONTENT_TYPE, contentType);
returnMessage.setTransportOut(makeConnectionMessage.getMessageContext().getTransportOut());
+
+ returnMessage.setProperty(Sandesha2Constants.TRANSOPORT_USED, Boolean.FALSE);
}
}
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Thu Jun 7 07:51:45 2007
@@ -209,6 +209,8 @@
message.setProperty(MessageContext.TRANSPORT_OUT,terminateSeqMsg.getProperty(MessageContext.TRANSPORT_OUT));
message.setProperty(Constants.OUT_TRANSPORT_INFO, terminateSeqMsg.getProperty(Constants.OUT_TRANSPORT_INFO));
+ message.setProperty(Sandesha2Constants.TRANSOPORT_USED, Boolean.FALSE);
+
terminateSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
AxisEngine engine = new AxisEngine(context);
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/policy/SandeshaPolicyBean.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/policy/SandeshaPolicyBean.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/policy/SandeshaPolicyBean.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/policy/SandeshaPolicyBean.java Thu Jun 7 07:51:45 2007
@@ -38,6 +38,11 @@
public class SandeshaPolicyBean implements Assertion {
+ public static final String SECONDS = "seconds";
+ public static final String MINUTES = "minutes";
+ public static final String HOURS = "hours";
+ public static final String DAYS = "days";
+
private SandeshaPolicyBean parent = null;
// String storageManagerClass = null;
@@ -55,7 +60,7 @@
private long inactiveTimeoutValue;
private boolean inactiveTimeoutValueSet = false;
- private String inactivityTimeoutMeasure;
+ private String inactivityTimeoutMeasure = SECONDS;
private long inactivityTimeoutInterval = -1;
private boolean inactivityTimeoutIntervalSet = false;
@@ -95,13 +100,13 @@
if (measure == null) {
this.inactivityTimeoutInterval = value;
- } else if ("seconds".equals(measure)) {
+ } else if (SECONDS.equals(measure)) {
timeOut = value * 1000;
- } else if ("minutes".equals(measure)) {
+ } else if (MINUTES.equals(measure)) {
timeOut = value * 60 * 1000;
- } else if ("hours".equals(measure)) {
+ } else if (HOURS.equals(measure)) {
timeOut = value * 60 * 60 * 1000;
- } else if ("days".equals(measure)) {
+ } else if (DAYS.equals(measure)) {
timeOut = value * 24 * 60 * 60 * 1000;
}
@@ -114,13 +119,13 @@
if (measure == null) {
this.sequenceRemovalTimeoutInterval = value;
- } else if ("seconds".equals(measure)) {
+ } else if (SECONDS.equals(measure)) {
timeOut = value * 1000;
- } else if ("minutes".equals(measure)) {
+ } else if (MINUTES.equals(measure)) {
timeOut = value * 60 * 1000;
- } else if ("hours".equals(measure)) {
+ } else if (HOURS.equals(measure)) {
timeOut = value * 60 * 60 * 1000;
- } else if ("days".equals(measure)) {
+ } else if (DAYS.equals(measure)) {
timeOut = value * 24 * 60 * 60 * 1000;
}
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Thu Jun 7 07:51:45 2007
@@ -74,7 +74,7 @@
matcher.setSend(true);
matcher.setSequenceID(sequenceId);
matcher.setTimeToSend(System.currentTimeMillis());
- matcher.setTransportAvailable(true);
+// matcher.setTransportAvailable(true);
List matches = super.find(matcher);
if(log.isDebugEnabled()) log.debug("Found " + matches.size() + " messages");
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Thu Jun 7 07:51:45 2007
@@ -55,6 +55,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.client.SandeshaClient;
import org.apache.sandesha2.client.SandeshaClientConstants;
import org.apache.sandesha2.client.SandeshaListener;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
@@ -372,7 +373,10 @@
data.setSubcode(SpecSpecificConstants.getFaultSubcode(referenceRMMessage.getRMNamespaceValue(),
Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED ));
- data.setReason(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceTerminatedFault, sequenceID));
+
+ String exceptionValue = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sequenceTerminatedFault, sequenceID);
+ data.setReason(exceptionValue);
+ data.setExceptionString(exceptionValue);
data.setType(Sandesha2Constants.SOAPFaults.FaultType.SEQUENCE_TERMINATED);
SOAPFactory factory = SOAPAbstractFactory.getSOAPFactory(SOAPVersion);
@@ -567,6 +571,13 @@
SandeshaListener listner = (SandeshaListener) rmMsgCtx.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
if (listner!=null)
listner.onError(fault);
+
+ //notifying the listeners of the sandeshaClient
+ Iterator listeners = SandeshaClient.getListeners();
+ while (listeners.hasNext()) {
+ SandeshaListener listener = (SandeshaListener) listeners.next();
+ listener.onError(fault);
+ }
// Get the SOAPVersion
SOAPFactory factory = (SOAPFactory) rmMsgCtx.getSOAPEnvelope().getOMFactory();
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/util/SequenceManager.java Thu Jun 7 07:51:45 2007
@@ -16,12 +16,15 @@
*/
package org.apache.sandesha2.util;
+import java.util.Iterator;
+
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.util.JavaUtils;
import org.apache.commons.logging.Log;
@@ -256,6 +259,10 @@
// Set the soap version use by this client
rmsBean.setSoapVersion(SandeshaUtil.getSOAPVersion(firstAplicationMsgCtx.getEnvelope()));
+ AxisService service = firstAplicationMsgCtx.getAxisService();
+ if (service!=null)
+ rmsBean.setServiceName(service.getName());
+
//setting the autoTermination property for the client side.
if (!firstAplicationMsgCtx.isServerSide()) {
Object avoidAutoTermination = firstAplicationMsgCtx.getProperty(SandeshaClientConstants.AVOID_AUTO_TERMINATION);
@@ -270,20 +277,20 @@
return rmsBean;
}
- public static boolean hasSequenceTimedOut(RMSBean rmsBean, String internalSequenceId, StorageManager storageManager)
+ public static boolean hasSequenceTimedOut(RMSBean rmsBean, SandeshaPolicyBean policyBean)
throws SandeshaException {
+
+// SandeshaPolicyBean propertyBean =
+// SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
- SandeshaPolicyBean propertyBean =
- SandeshaUtil.getPropertyBean(storageManager.getContext().getAxisConfiguration());
-
- if (propertyBean.getInactivityTimeoutInterval() <= 0)
+ if (policyBean.getInactivityTimeoutInterval() <= 0)
return false;
boolean sequenceTimedOut = false;
long lastActivatedTime = rmsBean.getLastActivatedTime();
long timeNow = System.currentTimeMillis();
- if (lastActivatedTime > 0 && (lastActivatedTime + propertyBean.getInactivityTimeoutInterval() < timeNow))
+ if (lastActivatedTime > 0 && (lastActivatedTime + policyBean.getInactivityTimeoutInterval() < timeNow))
sequenceTimedOut = true;
return sequenceTimedOut;
@@ -306,6 +313,17 @@
// Already an active transaction, so don't want a new one
TerminateManager.timeOutSendingSideSequence(internalSequenceID, storageManager);
+ //notifying listeners.
+
+ //listeners can be at the SandeshaClient or at individual messages.
+
+ Iterator listeners = SandeshaClient.getListeners();
+ while (listeners.hasNext()) {
+ SandeshaListener listener = (SandeshaListener) listeners.next();
+ SequenceReport report = SandeshaClient.getOutgoingSequenceReport(internalSequenceID, configurationContext, false);
+ listener.onTimeOut(report);
+ }
+
if (messageContext != null) {
SandeshaListener listener = (SandeshaListener) messageContext
.getProperty(SandeshaClientConstants.SANDESHA_LISTENER);
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Thu Jun 7 07:51:45 2007
@@ -58,14 +58,6 @@
msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
- // ending the transaction before invocation.
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- }
-
- //starting a transaction for the invocation work.
- transaction = storageManager.getTransaction();
// Lock the RMD Bean just to avoid deadlocks
SandeshaUtil.getRMDBeanFromSequenceId(storageManager, invokerBean.getSequenceID());
// Depending on the transaction support, the service will be invoked only once.
@@ -74,6 +66,12 @@
// removing the corresponding message context as well.
storageManager.removeMessageContext(messageContextKey);
+ // ending the transaction before invocation.
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
try {
boolean postFailureInvocation = false;
@@ -102,10 +100,6 @@
engine.resumeReceive(msgToInvoke);
}
- if(transaction!=null){
- transaction.commit();
- transaction = storageManager.getTransaction();
- }
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception :", e);
@@ -113,7 +107,7 @@
handleFault(rmMsg, e);
}
-
+ transaction = storageManager.getTransaction();
if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsg
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Thu Jun 7 07:51:45 2007
@@ -24,6 +24,7 @@
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.description.AxisService;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus;
import org.apache.axis2.wsdl.WSDLConstants;
@@ -97,7 +98,7 @@
// Finally, check for messages that can only be serviced by polling, and warn
// the user if they are too old
- checkForOrphanMessages(storageManager);
+// checkForOrphanMessages(storageManager);
if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
return sleep;
@@ -119,8 +120,14 @@
matcher.setTerminated(false);
RMSBean rms = storageManager.getRMSBeanMgr().findUnique(matcher);
if(rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
- sequenceId = rms.getSequenceID();
- if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
+ sequenceId = rms.getSequenceID();
+ String axisServiceName = rms.getServiceName();
+ AxisService service = storageManager.getContext().getAxisConfiguration().getService(axisServiceName);
+ SandeshaPolicyBean policyBean = null;
+ if (service!=null)
+ policyBean = SandeshaUtil.getPropertyBean(service);
+
+ if (policyBean!=null && SequenceManager.hasSequenceTimedOut(rms, policyBean))
SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
else
found = true;
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Thu Jun 7 07:51:45 2007
@@ -216,52 +216,89 @@
msgCtx.getOptions().setTimeOutInMilliSeconds(1000000);
try {
- AxisEngine engine = new AxisEngine (msgCtx.getConfigurationContext());
- InvocationResponse response = InvocationResponse.CONTINUE;
- SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation());
- if(policy.isUseMessageSerialization()) {
- if(msgCtx.isPaused()) {
- if (log.isDebugEnabled())
- log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
- msgCtx.setPaused(false);
- msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
- response = engine.resumeSend(msgCtx);
+ Boolean transportUsed = (Boolean) msgCtx.getProperty(Sandesha2Constants.TRANSOPORT_USED);
+
+
+ //we do the actual invocation only if the transport has not been used before
+ if (!Boolean.TRUE.equals(transportUsed)) {
+
+ AxisEngine engine = new AxisEngine(msgCtx
+ .getConfigurationContext());
+ InvocationResponse response = InvocationResponse.CONTINUE;
+
+ SandeshaPolicyBean policy = SandeshaUtil
+ .getPropertyBean(msgCtx.getAxisOperation());
+ if (policy.isUseMessageSerialization()) {
+ if (msgCtx.isPaused()) {
+ if (log.isDebugEnabled())
+ log.debug("Resuming a send for message : "
+ + msgCtx.getEnvelope().getHeader());
+ msgCtx.setPaused(false);
+ msgCtx.setProperty(
+ MessageContext.TRANSPORT_NON_BLOCKING,
+ Boolean.FALSE);
+ response = engine.resumeSend(msgCtx);
+ } else {
+ if (log.isDebugEnabled())
+ log.debug("Sending a message : "
+ + msgCtx.getEnvelope().getHeader());
+ msgCtx.setProperty(
+ MessageContext.TRANSPORT_NON_BLOCKING,
+ Boolean.FALSE);
+ engine.send(msgCtx); // TODO check if this should
+ // return an invocation response
+ }
} else {
+ // had to fully build the SOAP envelope to support
+ // retransmissions.
+ // Otherwise a 'parserAlreadyAccessed' exception could
+ // get thrown in retransmissions.
+ // But this has a performance reduction.
+ msgCtx.getEnvelope().build();
+
+ ArrayList retransmittablePhases = (ArrayList) msgCtx
+ .getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
+ if (retransmittablePhases != null) {
+ msgCtx.setExecutionChain(retransmittablePhases);
+ } else {
+ ArrayList emptyExecutionChain = new ArrayList();
+ msgCtx.setExecutionChain(emptyExecutionChain);
+ }
+
+ msgCtx.setCurrentHandlerIndex(0);
+ msgCtx.setCurrentPhaseIndex(0);
+ msgCtx.setPaused(false);
+
if (log.isDebugEnabled())
- log.debug("Sending a message : " + msgCtx.getEnvelope().getHeader());
- msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
- engine.send(msgCtx); // TODO check if this should return an invocation response
- }
- } else {
- // had to fully build the SOAP envelope to support
- // retransmissions.
- // Otherwise a 'parserAlreadyAccessed' exception could
- // get thrown in retransmissions.
- // But this has a performance reduction.
- msgCtx.getEnvelope().build();
-
- ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
- if (retransmittablePhases!=null) {
- msgCtx.setExecutionChain(retransmittablePhases);
- } else {
- ArrayList emptyExecutionChain = new ArrayList ();
- msgCtx.setExecutionChain(emptyExecutionChain);
+ log.debug("Resuming a send for message : "
+ + msgCtx.getEnvelope().getHeader());
+ System.out.println("resume send called....");
+ response = engine.resumeSend(msgCtx);
}
-
- msgCtx.setCurrentHandlerIndex(0);
- msgCtx.setCurrentPhaseIndex(0);
- msgCtx.setPaused(false);
-
+
if (log.isDebugEnabled())
- log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
- response = engine.resumeSend(msgCtx);
- }
- if(log.isDebugEnabled()) log.debug("Engine resume returned " + response);
- if(response != InvocationResponse.SUSPEND) {
- if(t != null) {
- if(log.isDebugEnabled()) log.debug("Signalling transport in " + t);
- t.signalResponseReady();
+ log.debug("Engine resume returned " + response);
+ if (response != InvocationResponse.SUSPEND) {
+ if (t != null) {
+ if (log.isDebugEnabled())
+ log.debug("Signalling transport in " + t);
+
+ // marking the inMessage as responsed.
+ if (inMsg != null) {
+ inMsg.setProperty(Constants.RESPONSE_WRITTEN,
+ Constants.VALUE_TRUE);
+ }
+
+ if (msgCtx.getTo() != null
+ && msgCtx.getTo().hasAnonymousAddress()) {
+ msgCtx.setProperty(
+ Sandesha2Constants.TRANSOPORT_USED,
+ Boolean.TRUE);
+ }
+
+ t.signalResponseReady();
+ }
}
}
@@ -628,6 +665,22 @@
}
}
+
+ boolean processedBefore = false;
+ if (responseMessageContext!=null) {
+ synchronized (responseMessageContext) {
+ Boolean processed = (Boolean) responseMessageContext.getProperty(Sandesha2Constants.MESSAGE_PROCESSED);
+ if (Boolean.TRUE.equals(processed)) {
+ processedBefore = true;
+ } else {
+ responseMessageContext.setProperty(Sandesha2Constants.MESSAGE_PROCESSED, Boolean.TRUE);
+ }
+ }
+ }
+
+ //if this has been processed before, simply return.
+ if (processedBefore)
+ return;
//if the syncResponseWas not built here and the client was not expecting a sync response. We will not try to execute
//here. Doing so will cause a double invocation for a async message.
Modified: webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/mar/module.xml
URL: http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/mar/module.xml?view=diff&rev=545205&r1=545204&r2=545205
==============================================================================
--- webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/mar/module.xml (original)
+++ webservices/sandesha/branches/sandesha2/java/temp/post_1_2/modules/mar/module.xml Thu Jun 7 07:51:45 2007
@@ -11,7 +11,7 @@
</handler>
<handler name="SequenceIDDispatcher" class="org.apache.sandesha2.handlers.SequenceIDDispatcher">
- <order phase="Dispatch" after="SandeshaGlobalInHandler" before="AddressingBasedDispatcher" />
+ <order phase="PreDispatch" after="SandeshaGlobalInHandler" before="AddressingBasedDispatcher" />
</handler>
<handler name="SandeshaInHandler" class="org.apache.sandesha2.handlers.SandeshaInHandler">
@@ -100,7 +100,7 @@
<sandesha2:ExponentialBackoff>false</sandesha2:ExponentialBackoff>
- <sandesha2:InactivityTimeout>60</sandesha2:InactivityTimeout>
+ <sandesha2:InactivityTimeout>6000</sandesha2:InactivityTimeout>
<sandesha2:InactivityTimeoutMeasure>seconds</sandesha2:InactivityTimeoutMeasure>
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org