You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ch...@apache.org on 2006/01/30 16:57:27 UTC
svn commit: r373537 - in
/webservices/sandesha/trunk/src/org/apache/sandesha2: ./ handlers/
msgprocessors/ storage/inmemory/ util/ workers/
Author: chamikara
Date: Mon Jan 30 07:55:20 2006
New Revision: 373537
URL: http://svn.apache.org/viewcvs?rev=373537&view=rev
Log:
Corrected the ending of SimpleAxisServer correctly after an RM sequence.
Modified:
webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/AcknowledgementManager.java Mon Jan 30 07:55:20 2006
@@ -87,15 +87,14 @@
}
String internalSequenceId = (String) internalSequenceBean.getValue();
- findBean.setInternalSequenceID(internalSequenceId);
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
findBean.setSend(true);
findBean.setReSend(false);
Collection collection = retransmitterBeanMgr.find(findBean);
+
Iterator it = collection.iterator();
if (it.hasNext()) {
-
SenderBean ackBean = (SenderBean) it.next();
long timeNow = System.currentTimeMillis();
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/Sandesha2Constants.java Mon Jan 30 07:55:20 2006
@@ -192,6 +192,8 @@
String NO_OF_OUTGOING_MSGS_ACKED = "NoOfOutGoingMessagesAcked";
String TRANSPORT_TO = "TransportTo";
+
+ String OUT_SEQ_ACKSTO = "OutSequenceAcksTo";
}
public interface SOAPVersion {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/TerminateManager.java Mon Jan 30 07:55:20 2006
@@ -22,6 +22,9 @@
import java.util.HashMap;
import java.util.Iterator;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.client.ListenerManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -176,7 +179,8 @@
* @param sequenceID
* @throws SandeshaException
*/
- public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID) throws SandeshaException {
+ public static void terminateSendingSide (ConfigurationContext configContext, String sequenceID,boolean serverSide) throws SandeshaException {
+
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext);
//TODO - remove folowing redundant transaction
@@ -186,6 +190,38 @@
SenderBeanMgr retransmitterBeanMgr = storageManager.getRetransmitterBeanMgr();
CreateSeqBeanMgr createSeqBeanMgr = storageManager.getCreateSeqBeanMgr();
+
+ if (!serverSide) {
+ //stpoing the listner for the client side.
+
+ //SequencePropertyBean outGoingAcksToBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.OUT_SEQ_ACKSTO);
+
+ boolean stopListnerForAsyncAcks = false;
+ SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
+ if (internalSequenceBean!=null) {
+ String internalSequenceID = internalSequenceBean.getValue();
+ SequencePropertyBean acksToBean = sequencePropertyBeanMgr.retrieve(internalSequenceID,Sandesha2Constants.SequenceProperties.ACKS_TO_EPR);
+
+ if (acksToBean!=null) {
+ String acksTo = acksToBean.getValue();
+ if (acksTo!=null && !Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+ stopListnerForAsyncAcks = true;
+ }
+ }
+ }
+
+ try {
+ //this removes the listner entry for receiving async acks.
+ if (stopListnerForAsyncAcks)
+ ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
+
+ //TODO stop listner for asyncControlMessages
+
+ } catch (AxisFault e) {
+ throw new SandeshaException (e.getMessage());
+ }
+ }
+
SequencePropertyBean internalSequenceBean = sequencePropertyBeanMgr.retrieve(sequenceID,Sandesha2Constants.SequenceProperties.INTERNAL_SEQUENCE_ID);
if (internalSequenceBean==null)
throw new SandeshaException ("TempSequence entry not found");
@@ -228,14 +264,6 @@
}
terminateSendingTransaction.commit();
-
- //asking the listner to stop.
- //if (clientSide)
-// try {
-// ListenerManager.stop(configContext,Constants.TRANSPORT_HTTP);
-// } catch (AxisFault e) {
-// throw new SandeshaException (e.getMessage());
-// }
SandeshaUtil.stopSenderForTheSequence(internalSequenceId);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Jan 30 07:55:20 2006
@@ -290,15 +290,6 @@
.getProperty(MessageContext.TRANSPORT_IN);
if (transportIn == null)
transportIn = org.apache.axis2.Constants.TRANSPORT_HTTP;
-
- // For receiving async Ack messages.
- // try {
- ListenerManager.makeSureStarted(transportIn, context);
- // } catch (AxisFault e) {
- // log.debug("Could not start listener...");
- // log.debug(e.getStackTrace());
- // }
-
} else if (acksTo == null && serverSide) {
String incomingSequencId = SandeshaUtil
.getServerSideIncomingSeqIdFromInternalSeqId(internalSequenceId);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Jan 30 07:55:20 2006
@@ -123,7 +123,6 @@
rmMsgCtx.setRelatesTo(null);
SenderBean input = new SenderBean();
- input.setInternalSequenceID(internalSequenceId);
input.setSend(true);
input.setReSend(true);
Collection retransmitterEntriesOfSequence = retransmitterMgr
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Jan 30 07:55:20 2006
@@ -411,7 +411,6 @@
//the internalSequenceId value of the retransmitter Table for the
// messages related to an incoming
//sequence is the actual sequence ID
- ackBean.setInternalSequenceID(sequenceId);
// RMPolicyBean policyBean = (RMPolicyBean) rmMsgCtx
// .getProperty(Sandesha2Constants.WSP.RM_POLICY_BEAN);
@@ -442,7 +441,6 @@
//removing old acks.
SenderBean findBean = new SenderBean();
findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
- findBean.setInternalSequenceID(sequenceId);
findBean.setSend(true);
findBean.setReSend(false);
Collection coll = retransmitterBeanMgr.find(findBean);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java Mon Jan 30 07:55:20 2006
@@ -84,7 +84,7 @@
public Collection find(SenderBean bean) {
ArrayList beans = new ArrayList();
- Iterator iterator = table.values().iterator();
+ Iterator iterator = ((Hashtable) table).values().iterator();
SenderBean temp;
while (iterator.hasNext()) {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SandeshaUtil.java Mon Jan 30 07:55:20 2006
@@ -621,9 +621,9 @@
return Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX + ":" + to + ":" +sequenceKey;
}
-// public static String getServerSideInternalSeqID (String incomingSeqId) {
-// return (Sandesha2Constants.SANDESHA2_INTERNAL_SEQUENCE_ID + ":" + incomingSeqId);
-// }
+ public static String getInternalSequenceID (String sequenceID) {
+ return Sandesha2Constants.INTERNAL_SEQUENCE_PREFIX + ":" + sequenceID;
+ }
public static String getSequenceIDFromInternalSequenceID (String internalSequenceID, ConfigurationContext configurationContext) throws SandeshaException {
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/util/SequenceManager.java Mon Jan 30 07:55:20 2006
@@ -10,10 +10,13 @@
import java.util.StringTokenizer;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ListenerManager;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.MessageContextConstants;
+import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.Parameter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -188,6 +191,50 @@
SandeshaUtil.startSenderForTheSequence(configurationContext,internalSequenceId);
+ updateClientSideListnerIfNeeded (firstAplicationMsgCtx);
+
+ }
+
+ private static void updateClientSideListnerIfNeeded (MessageContext messageContext) throws SandeshaException {
+ if (messageContext.isServerSide())
+ return; //listners are updated only for the client side.
+
+ String transportInProtocol = messageContext.getOptions().getTransportInProtocol();
+
+ String acksTo = (String) messageContext.getProperty(Sandesha2ClientAPI.AcksTo);
+ String mep = messageContext.getAxisOperation().getMessageExchangePattern();
+
+ boolean startListnerForAsyncAcks = false;
+ boolean startListnerForAsyncControlMsgs = false; //For async createSerRes & terminateSeq.
+
+ if (acksTo!=null && !Sandesha2Constants.WSA.NS_URI_ANONYMOUS.equals(acksTo)) {
+ //starting listner for async acks.
+ startListnerForAsyncAcks = true;
+ }
+
+ if (mep!=null && !AxisOperation.MEP_URI_OUT_ONLY.equals(mep)) {
+ //starting listner for the async createSeqResponse & terminateSer messages.
+ startListnerForAsyncControlMsgs = true;
+ }
+
+ try {
+ if ((startListnerForAsyncAcks || startListnerForAsyncControlMsgs) && transportInProtocol==null)
+ throw new SandeshaException ("Cant start the listner since the TransportInProtocol is null");
+
+ if (startListnerForAsyncAcks)
+ ListenerManager.makeSureStarted(messageContext.getOptions().getTransportInProtocol(),messageContext.getConfigurationContext());
+
+ if (startListnerForAsyncControlMsgs)
+ ListenerManager.makeSureStarted(messageContext.getOptions().getTransportInProtocol(),messageContext.getConfigurationContext());
+
+
+ } catch (AxisFault e) {
+ String message = "Cant start the listner for incoming messages";
+ log.error(e.getStackTrace());
+ System.out.println(e.getStackTrace());
+ throw new SandeshaException (message);
+ }
+
}
/**
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/InOrderInvoker.java Mon Jan 30 07:55:20 2006
@@ -197,8 +197,8 @@
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
if (sequence.getLastMessage() != null) {
- TerminateManager.cleanReceivingSideAfterInvocation(
- context, sequenceId);
+ TerminateManager.cleanReceivingSideAfterInvocation(context, sequenceId);
+
//this sequence has no more invocations
stopInvokerForTheSequence(sequenceId);
Modified: webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewcvs/webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java?rev=373537&r1=373536&r2=373537&view=diff
==============================================================================
--- webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/src/org/apache/sandesha2/workers/Sender.java Mon Jan 30 07:55:20 2006
@@ -172,7 +172,7 @@
//do time out processing.
//TODO uncomment below line
- //TerminateManager.terminateSendingSide(context,sequenceID);
+ TerminateManager.terminateSendingSide(context,sequenceID,msgCtx.isServerSide());
String message = "Sequence timed out";
log.debug(message);
@@ -185,14 +185,8 @@
.piggybackAckIfPresent(rmMsgCtx);
}
-
-
preSendTransaction.commit();
- if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
- int i =1;
- }
-
try {
//every message should be resumed (pause==false) when sending
boolean paused = msgCtx.isPaused();
@@ -258,8 +252,7 @@
ConfigurationContext configContext = msgCtx
.getConfigurationContext();
- TerminateManager.terminateSendingSide(
- configContext, sequenceID);
+ TerminateManager.terminateSendingSide(configContext, sequenceID,msgCtx.isServerSide());
//removing a entry from the Listener
String transport = msgCtx.getTransportOut().getName().getLocalPart();
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org