You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/09/11 08:55:52 UTC

svn commit: r574490 - in /webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2: handlers/ msgprocessors/ polling/ storage/ storage/inmemory/ util/ workers/

Author: mckierna
Date: Mon Sep 10 23:55:50 2007
New Revision: 574490

URL: http://svn.apache.org/viewvc?rev=574490&view=rev
Log:
performance improvements

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java Mon Sep 10 23:55:50 2007
@@ -84,10 +84,11 @@
 				return returnValue ;
 			}
 		}
-		
+
+	    StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
 		//this will change the execution chain of this message to work correctly in retransmissions.
 		//For e.g. Phases like security will be removed to be called in each retransmission.
-		SandeshaUtil.modifyExecutionChainForStoring(msgCtx);
+	    SandeshaUtil.modifyExecutionChainForStoring(msgCtx, storageManager);
 
 		String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
 		if (null != DONE && "true".equals(DONE)) {
@@ -97,7 +98,6 @@
 		}
 		
 		msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
-		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
 
 		Transaction transaction = null;
 
@@ -126,7 +126,7 @@
 			}
 
 			if (msgProcessor != null){
-				if(msgProcessor.processOutMessage(rmMsgCtx)){
+		        if(msgProcessor.processOutMessage(rmMsgCtx, transaction)){
 					//the msg was paused
 					returnValue = InvocationResponse.SUSPEND;
 				}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Sep 10 23:55:50 2007
@@ -266,7 +266,7 @@
 			msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 			
 			// passing the message through sandesha2sender
-			SandeshaUtil.executeAndStore(ackRMMsgCtx, key);
+		    SandeshaUtil.executeAndStore(ackRMMsgCtx, key, storageManager);
 
 			// inserting the new Ack.
 			senderBeanMgr.insert(ackBean);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Mon Sep 10 23:55:50 2007
@@ -116,6 +116,12 @@
 		String outSequenceId = sequenceAck.getIdentifier().getIdentifier();
 		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, outSequenceId);
 
+		if(rmsBean==null){
+		  if (log.isDebugEnabled())
+			  log.debug("Exit: AcknowledgementProcessor::processAckHeader, Sequence bean not found");
+		  return;
+		}
+		
 		if (outSequenceId == null || "".equals(outSequenceId)) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outSeqIDIsNull);
 			log.debug(message);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -51,6 +51,9 @@
 import org.apache.sandesha2.util.SOAPAbstractFactory;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
 import org.apache.sandesha2.wsrm.CreateSequence;
 import org.apache.sandesha2.wsrm.SequenceOffer;
 
@@ -82,7 +85,7 @@
 		return false;
 	}
 	
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction tran) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
 
@@ -252,7 +255,7 @@
 				// server and the client sides.
 				if (rmsBean == null) {
 					rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
-					rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
+					rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager, tran);
 				}
 			}
 		
@@ -405,7 +408,7 @@
 		
 		// processing the response if not an dummy.
 		if (!dummyMessage)
-			processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager);
+			processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager, tran);
 		
 		//Users wont be able to get reliable response msgs in the back channel in the back channel of a 
 		//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
@@ -417,7 +420,7 @@
 	}
 
 	private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, RMSBean rmsBean,
-			StorageManager storageManager) throws AxisFault {
+			StorageManager storageManager, Transaction tran) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
@@ -484,7 +487,7 @@
 
 		createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 		
-		SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey);
+		SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
 
 		retransmitterMgr.insert(createSeqEntry);
 
@@ -497,7 +500,7 @@
 	}
 
 	private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
-			String storageKey, StorageManager storageManager) throws AxisFault {
+		    String storageKey, StorageManager storageManager, Transaction tran) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
 
@@ -521,6 +524,11 @@
 			}
 		}
 
+		boolean sendingNow = false;
+		if(outSequenceID != null && !storageManager.hasUserTransaction(msg)) {
+		  sendingNow = true;
+		}
+		
 		// Now that we have decided which sequence to use for the message, make sure that we secure
 		// it with the correct token.
 		RMMsgCreator.secureOutboundMessage(rmsBean, msg);
@@ -563,11 +571,35 @@
 		// increasing the current handler index, so that the message will not be
 		// going throught the SandeshaOutHandler again.
 		msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
-
-		SandeshaUtil.executeAndStore(rmMsg, storageKey);
+		SandeshaUtil.executeAndStore(rmMsg, storageKey, storageManager);
+		
+		// Lock the sender bean before we insert it, if we are planning to send it ourselves
+		SenderWorker worker = null;
+		if(sendingNow) {
+		  String workId = appMsgEntry.getMessageID() + appMsgEntry.getTimeToSend();
+		  SandeshaThread sender = storageManager.getSender();
+		  ConfigurationContext context = msg.getConfigurationContext();
+		  WorkerLock lock = sender.getWorkerLock();
+      
+		  worker = new SenderWorker(context, appMsgEntry, rmsBean.getRMVersion());
+		  worker.setLock(lock);
+		  worker.setWorkId(workId);
+		  // Actually take the lock
+		  lock.addWork(workId, worker);
+		}
 
 		retransmitterMgr.insert(appMsgEntry);
 
+		// Commit the transaction, so that the sender worker starts with a clean slate.
+		if(tran != null && tran.isActive()) tran.commit();
+		 
+		if(worker != null) {
+		  try {
+		    worker.run();
+		  } catch(Exception e)  {
+		    log.error("Caught exception running SandeshaWorker", e);
+		  }
+		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
 	}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Sep 10 23:55:50 2007
@@ -141,7 +141,7 @@
 		return false;
 	}
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	 public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled()) 
 			log.debug("Enter: CloseSequenceProcessor::processOutMessage");
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -343,7 +343,7 @@
 		return true;
 	}
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
 		if (log.isDebugEnabled())
 			log.debug("Enter: CreateSeqMsgProcessor::processOutMessage");
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -262,7 +262,7 @@
 		return true;
 	}
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
 		if (log.isDebugEnabled()) {
 			log.debug("Enter: CreateSeqResponseMsgProcessor::processOutMessage");
 			log.debug("Exit: CreateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/LastMessageProcessor.java Mon Sep 10 23:55:50 2007
@@ -51,7 +51,7 @@
 		return true;
 	}
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
 		return false;
 	}
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java Mon Sep 10 23:55:50 2007
@@ -211,7 +211,7 @@
 		messagePending.toSOAPEnvelope(returnMessage.getEnvelope());
 	}
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
 		return false;
 	}
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -40,5 +40,5 @@
 	 * @return true if the msg context has been paused
 	 * @throws AxisFault
 	 */
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault;
-}
\ No newline at end of file
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction)throws AxisFault;
+}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java Mon Sep 10 23:55:50 2007
@@ -28,6 +28,7 @@
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.context.OperationContext;
 import org.apache.axis2.engine.Handler.InvocationResponse;
+import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.transport.TransportUtils;
 import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
@@ -345,6 +346,18 @@
 			RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId, storageManager,true);
 
 			AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, timeToSend, storageManager);
+			
+			// If the MEP doesn't need the backchannel, and nor do we, we should signal it so that it
+			// can close off as soon as possible.
+			result = InvocationResponse.ABORT;
+			RequestResponseTransport t = null;
+			t = (RequestResponseTransport) rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+			
+			// Tell the transport that there will be no response message
+			if(t != null) {
+				TransportUtils.setResponseWritten(msgCtx, false);
+				t.acknowledgeMessage(msgCtx);
+			}
 		}
 		
 		// If this message matches the WSRM 1.0 pattern for an empty last message (e.g.

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -349,7 +349,7 @@
 		return terminateSeqResponseRMMsg;
 	}
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) throws AxisFault {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
 
 		if (log.isDebugEnabled())
 			log.debug("Enter: TerminateSeqMsgProcessor::processOutMessage");

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Mon Sep 10 23:55:50 2007
@@ -89,7 +89,7 @@
 		return true;
   }
 
-	public boolean processOutMessage(RMMsgContext rmMsgCtx) {
+	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction transaction) {
 		if(log.isDebugEnabled()) log.debug("Enter: TerminateSeqResponseMsgProcessor::processOutMessage");
 		if(log.isDebugEnabled()) log.debug("Exit: TerminateSeqResponseMsgProcessor::processOutMessage " + Boolean.FALSE);
 		return false;

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/polling/PollingManager.java Mon Sep 10 23:55:50 2007
@@ -149,17 +149,18 @@
 			// This sequence must have been terminated, or deleted
 			stopThreadForSequence(entry.getSequenceId(), true);
 		} else {
-      if (log.isDebugEnabled())
-        log.debug("Polling rms " + beanToPoll);
+			if (log.isDebugEnabled())
+				log.debug("Polling rms " + beanToPoll);
 			// The sequence is there, but we still only poll if we are expecting reply messages,
 			// or if we don't have clean ack state.
-      boolean cleanAcks = false;
-      if (beanToPoll.getNextMessageNumber() > -1)
-      	cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
+			boolean cleanAcks = false;
+			if (beanToPoll.getNextMessageNumber() > -1)
+				cleanAcks = AcknowledgementManager.verifySequenceCompletion(beanToPoll.getClientCompletedMessages(), beanToPoll.getNextMessageNumber());
 			long  repliesExpected = beanToPoll.getExpectedReplies();
-			if((force ||	!cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null)
-				pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
+			if(beanToPoll.getSequenceID() != null && (force || !cleanAcks || repliesExpected > 0) && beanToPoll.getReferenceMessageStoreKey() != null)
+		            pollForSequence(beanToPoll.getAnonymousUUID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll, entry);
 		}
+		
 
 		if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
 	}
@@ -262,7 +263,7 @@
 			//this message should not be sent until it is qualified. I.e. till it is sent through the Sandesha2TransportSender.
 			makeConnectionRMMessage.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
 			
-			SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey);
+	        SandeshaUtil.executeAndStore(makeConnectionRMMessage, makeConnectionMsgStoreKey, storageManager);
 			
 			senderBeanMgr.insert(makeConnectionSenderBean);			
 		}

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/StorageManager.java Mon Sep 10 23:55:50 2007
@@ -100,5 +100,16 @@
 	public abstract MessageContext retrieveMessageContext (String storageKey, ConfigurationContext configContext) throws SandeshaStorageException;
 
 	public abstract void removeMessageContext (String storageKey) throws SandeshaStorageException;
-
+	
+	
+	/**
+	 * If there is no user transaction in scope then we can optimize the sending / invoking of a
+	 * message. This method allows the StorageManager to tell the core Sandesha code if there
+	 * is a transaction in scope.
+	 * @return true, if there is a user transaction in scope.
+	 */
+	public abstract boolean hasUserTransaction(MessageContext message) throws SandeshaStorageException;
+	 
+	public abstract boolean requiresMessageSerialization();
+	
 }

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Mon Sep 10 23:55:50 2007
@@ -338,6 +338,15 @@
 	public void  initStorage (AxisModule moduleDesc) {
 		
 	}
+	
+	//We do not support user transactions in-memory
+	public boolean hasUserTransaction(MessageContext msg) {
+		return false;
+	}
+	  
+	public boolean requiresMessageSerialization() {
+		return useSerialization;
+	}
 
 	private class SerializedStorageEntry {
 		MessageContext       message;
@@ -355,6 +364,7 @@
 		SOAPEnvelope   envelope;
 	}
 }
+
 
 
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java Mon Sep 10 23:55:50 2007
@@ -18,9 +18,7 @@
 package org.apache.sandesha2.util;
 
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Set;
 
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
@@ -106,63 +104,48 @@
 			if(log.isDebugEnabled()) log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
 			return transaction;
 		}
-		
-		// From here on, we must be dealing with a real address. Piggyback all sequences that have an
-		// acksTo that matches the To address, and that have an ackMessage queued up for sending.
-		Set acked = new HashSet();
-		SenderBean findBean = new SenderBean();
-		findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
-		findBean.setSend(true);
-		findBean.setToAddress(target.getAddress());
-
-		Collection collection = retransmitterBeanMgr.find(findBean);
-		
-		if (transaction != null && transaction.isActive())
-			transaction.commit();
-		
-		transaction = storageManager.getTransaction();
-		
-		Iterator it = collection.iterator();
-		while (it.hasNext()) {
-			SenderBean ackBean = (SenderBean) it.next();
-
+	    // From here on, we must be dealing with a real address. Piggyback all sequences that have an
+	    // acksTo that matches the To address, and that have an ackMessage queued up for sending. We
+	    // search for RMDBeans first, to avoid a deadlock.
+	    //
+	    // As a special case, if this is a terminate sequence message then add in ack messages for
+	    // any sequences that have an acksTo that matches the target address. This helps to ensure
+	    // that request-response sequence pairs end cleanly.
+	    RMDBean findRMDBean = new RMDBean();
+	    findRMDBean.setAcksToEPR(target.getAddress());
+	    findRMDBean.setTerminated(false);
+	    Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
+	    Iterator sequences = rmdBeans.iterator();
+	    while(sequences.hasNext()) {
+	      RMDBean sequence = (RMDBean) sequences.next();
+	      String sequenceId = sequence.getSequenceID();
+	      
+	      // Look for the SenderBean that carries the ack, there should be at most one
+	      SenderBean findBean = new SenderBean();
+	      findBean.setMessageType(Sandesha2Constants.MessageTypes.ACK);
+	      findBean.setSend(true);
+	      findBean.setSequenceID(sequenceId);
+	      findBean.setToAddress(target.getAddress());
+	      
+	      SenderBean ackBean = retransmitterBeanMgr.findUnique(findBean);
+	      
 			// Piggybacking will happen only if the end of ack interval (timeToSend) is not reached.
 			long timeNow = System.currentTimeMillis();
-			if (ackBean.getTimeToSend() > timeNow) {
+		    if (ackBean != null && ackBean.getTimeToSend() > timeNow) {
 				// Delete the beans that would have sent the ack
 				retransmitterBeanMgr.delete(ackBean.getMessageID());
 				storageManager.removeMessageContext(ackBean.getMessageContextRefKey());
 
-				String sequenceId = ackBean.getSequenceID();
 				if (log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
+		        RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence);
+
+		    } else if(rmMessageContext.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
+		        if(log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
+		          
+		        if(sequence.getHighestInMessageNumber() > 0) {
+					  if(log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
 
-				RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
-				if(rmdBean != null && !rmdBean.isTerminated()) {
-					RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, rmdBean);
-				}
-				acked.add(sequenceId);
-			}
-		}
-		
-		// As a special case, if this is a terminate sequence message then add in ack messages for
-		// any sequences that have an acksTo that matches the target address. This helps to ensure
-		// that request-response sequence pairs end cleanly.
-		if(rmMessageContext.getMessageType() == Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
-			if(log.isDebugEnabled()) log.debug("Adding extra acks, as this is a terminate");
-			
-			RMDBean findRMDBean = new RMDBean();
-			findRMDBean.setAcksToEPR(target.getAddress());
-			findRMDBean.setTerminated(false);
-			Collection rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
-			Iterator sequences = rmdBeans.iterator();
-			while(sequences.hasNext()) {
-				RMDBean sequence = (RMDBean) sequences.next();
-				String sequenceId = sequence.getSequenceID();
-				
-				if(!acked.contains(sequenceId) && sequence.getHighestInMessageNumber() > 0) {
-					if(log.isDebugEnabled()) log.debug("Piggybacking ack for sequence: " + sequenceId);
 					RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence);
-					acked.add(sequenceId);
 				}
 			}
 		}
@@ -313,7 +296,7 @@
 		// passing the message through sandesha2sender
 		ackMsgContext.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
 		
-		SandeshaUtil.executeAndStore(ackRMMsgContext, key);
+	    SandeshaUtil.executeAndStore(ackRMMsgContext, key, storageManager);
 
 		// inserting the new ack.
 		retransmitterBeanMgr.insert(ackBean);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java Mon Sep 10 23:55:50 2007
@@ -25,6 +25,7 @@
 import java.util.Map;
 
 import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
 
 import org.apache.axiom.om.OMElement;
@@ -883,7 +884,7 @@
 		else 
 			return false;
 	}
-	public static void executeAndStore (RMMsgContext rmMsgContext, String storageKey) throws AxisFault {
+	 public static void executeAndStore (RMMsgContext rmMsgContext, String storageKey, StorageManager manager) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: SandeshaUtil::executeAndStore, " + storageKey);
 		
@@ -891,7 +892,7 @@
 		ConfigurationContext configurationContext = msgContext.getConfigurationContext();
 
 		SandeshaPolicyBean policy = getPropertyBean(msgContext.getAxisOperation());
-		if(policy.isUseMessageSerialization()) {
+	    if(manager.requiresMessageSerialization()) {
 			msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_TRUE);
 
 			StorageManager store = getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration());
@@ -908,27 +909,26 @@
 	
 			Sandesha2TransportOutDesc sandesha2TransportOutDesc = new Sandesha2TransportOutDesc();
 			msgContext.setTransportOut(sandesha2TransportOutDesc);
+			
+			//this invocation has to be a blocking one.
+			Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
+			if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
+				msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
 	
 	 		// sending the message once through Sandesha2TransportSender.
 			if (msgContext.isPaused())
 				AxisEngine.resumeSend(msgContext);
 			else {
-				//this invocation has to be a blocking one.
-				
-				Boolean isTransportNonBlocking = (Boolean) msgContext.getProperty(MessageContext.TRANSPORT_NON_BLOCKING);
-				if (isTransportNonBlocking!=null && isTransportNonBlocking.booleanValue())
-					msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
-				
-				AxisEngine.send(msgContext);
-				
-				msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
+				AxisEngine.send(msgContext);	
 			}
+			//put the original value of isTransportNonBlocking back on
+			msgContext.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, isTransportNonBlocking);
 		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: SandeshaUtil::executeAndStore");
 	}
 	
-	public static void modifyExecutionChainForStoring (MessageContext message)
+	public static void modifyExecutionChainForStoring (MessageContext message, StorageManager manager)
 	throws SandeshaException
 	{
 		
@@ -936,8 +936,7 @@
 		if (property!=null)
 			return; //Phases are already set. Dont hv to redo.
 		
-		SandeshaPolicyBean policy = getPropertyBean(message.getAxisOperation());
-		if(policy.isUseMessageSerialization())
+	    if(manager.requiresMessageSerialization())
 			return; // No need to mess with the transport when we use message serialization
 		
 		TransportOutDescription transportOutDescription = message.getTransportOut();
@@ -1120,40 +1119,53 @@
 	}	
 	
 	
-	public static SOAPEnvelope cloneEnvelope(SOAPEnvelope envelope) throws SandeshaException {
-		
-		// Now clone the env and set it in the message context
-		XMLStreamReader streamReader = envelope.cloneOMElement().getXMLStreamReader();
-		SOAPEnvelope clonedEnvelope = new StAXSOAPModelBuilder(streamReader, null).getSOAPEnvelope();
-
-		// you have to explicitely set the 'processed' attribute for header
-		// blocks, since it get lost in the above read from the stream.
-
-		SOAPHeader header = envelope.getHeader();
-		if (header != null) {
-			Iterator childrenOfOldEnv = header.getChildElements();
-			Iterator childrenOfNewEnv = clonedEnvelope.getHeader().getChildElements();
-			while (childrenOfOldEnv.hasNext()) {
-				
-				SOAPHeaderBlock oldEnvHeaderBlock = (SOAPHeaderBlock) childrenOfOldEnv.next();
-				SOAPHeaderBlock newEnvHeaderBlock = (SOAPHeaderBlock) childrenOfNewEnv.next();
+  public static SOAPEnvelope cloneEnvelope(SOAPEnvelope envelope) throws SandeshaException {
+	    
+	    // Now clone the env and set it in the message context. We need to be sure that we
+    // close off the stream reader, in order to free up some of the heap.
+    XMLStreamReader streamReader = null;
+    SOAPEnvelope clonedEnvelope = null;
+    try {
+      streamReader = envelope.getXMLStreamReader();     
+      clonedEnvelope = new StAXSOAPModelBuilder(streamReader, null).getSOAPEnvelope();
+      // you have to explicitely set the 'processed' attribute for header
+      // blocks, since it get lost in the above read from the stream.
+     
+      SOAPHeader header = envelope.getHeader();
+      if (header != null) {
+        Iterator childrenOfOldEnv = header.getChildElements();
+        Iterator childrenOfNewEnv = clonedEnvelope.getHeader().getChildElements();
+        while (childrenOfOldEnv.hasNext()) {        
+          SOAPHeaderBlock oldEnvHeaderBlock = (SOAPHeaderBlock) childrenOfOldEnv.next();
+          SOAPHeaderBlock newEnvHeaderBlock = (SOAPHeaderBlock) childrenOfNewEnv.next();
+
+          QName oldEnvHeaderBlockQName = oldEnvHeaderBlock.getQName();
+          if (oldEnvHeaderBlockQName != null) {
+            if (oldEnvHeaderBlockQName.equals(newEnvHeaderBlock.getQName())) {
+              if (oldEnvHeaderBlock.isProcessed())
+                newEnvHeaderBlock.setProcessed();
+              } else {
+                String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cloneDoesNotMatchToOriginal);
+                throw new SandeshaException(message);
+              }
+            }
+          }
+        }
+        // Completely build the new tree
+        clonedEnvelope.build();
+      } finally {
+      if(streamReader != null) {
+        try {
+          streamReader.close();
+        } catch(XMLStreamException e) {
+          log.debug("Caught exception closing stream", e);
+        }
+      }
 
-				QName oldEnvHeaderBlockQName = oldEnvHeaderBlock.getQName();
-				if (oldEnvHeaderBlockQName != null) {
-					if (oldEnvHeaderBlockQName.equals(newEnvHeaderBlock.getQName())) {
-						if (oldEnvHeaderBlock.isProcessed())
-							newEnvHeaderBlock.setProcessed();
-					} else {
-						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cloneDoesNotMatchToOriginal);
-						throw new SandeshaException(message);
-					}
-				}
-			}
-		}
-		
-		return clonedEnvelope;
-	}
-	
+    }
+    return clonedEnvelope;
+  }
+	  
 	public static final String getStackTraceFromException(Exception e) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     PrintWriter pw = new PrintWriter(baos);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java Mon Sep 10 23:55:50 2007
@@ -366,7 +366,7 @@
 //		terminateRMMessage.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_PROPERTY_KEY, sequencePropertyKey);
 		
 		// / addTerminateSeqTransaction.commit();
-		SandeshaUtil.executeAndStore(terminateRMMessage, key);
+	    SandeshaUtil.executeAndStore(terminateRMMessage, key, storageManager);
 		
 		SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Mon Sep 10 23:55:50 2007
@@ -167,7 +167,7 @@
 
 		SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
 		
-		SandeshaUtil.executeAndStore(rmMsgCtx, key);
+	    SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
 	
 		retramsmitterMgr.insert(senderBean);
 		

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java Mon Sep 10 23:55:50 2007
@@ -124,7 +124,7 @@
 					
 						//adding the workId to the lock after assigning it to a thread makes sure 
 						//that all the workIds in the Lock are handled by threads.
-						getWorkerLock().addWork(workId);
+			            getWorkerLock().addWork(workId, worker);
 
 						long msgNumber = invoker.getMsgNo();
 						//if necessary, update the "next message number" bean under this transaction
@@ -349,12 +349,14 @@
 				if(contextMgr != null) {
 					work = contextMgr.wrapWithContext(work, bean.getContext());
 				}
-				threadPool.execute(work);
-				
-				//adding the workId to the lock after assigning it to a thread makes sure 
-				//that all the workIds in the Lock are handled by threads.
-				getWorkerLock().addWork(workId);
-				
+				try {
+					// Set the lock up before we start the thread, but roll it back
+					// if we hit any problems
+					getWorkerLock().addWork(workId, worker);
+					threadPool.execute(work);
+				} catch(Exception e) {
+					getWorkerLock().removeWork(workId);
+				}				
 				processedMessage = true;
 			}
 			

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java Mon Sep 10 23:55:50 2007
@@ -1,5 +1,7 @@
 package org.apache.sandesha2.workers;
 
+import org.apache.axiom.om.impl.builder.StAXBuilder;
+import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
@@ -7,6 +9,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.engine.Handler.InvocationResponse;
 import org.apache.axis2.transport.RequestResponseTransport;
 import org.apache.axis2.util.MessageContextBuilder;
 import org.apache.commons.logging.Log;
@@ -85,19 +88,30 @@
 						&& Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
 					postFailureInvocation = true;
 
+		        InvocationResponse response = null;
 				if (postFailureInvocation) {
 					makeMessageReadyForReinjection(msgToInvoke);
 					if (log.isDebugEnabled())
 						log.debug("Receiving message, key=" + messageContextKey + ", msgCtx="
 								+ msgToInvoke.getEnvelope().getHeader());
-					AxisEngine.receive(msgToInvoke);
+					response = AxisEngine.receive(msgToInvoke);
 				} else {
 					if (log.isDebugEnabled())
 						log.debug("Resuming message, key=" + messageContextKey + ", msgCtx="
 								+ msgToInvoke.getEnvelope().getHeader());
 					msgToInvoke.setPaused(false);
-					AxisEngine.resumeReceive(msgToInvoke);
+					response = AxisEngine.resumeReceive(msgToInvoke);
 				}
+		        if(!InvocationResponse.SUSPEND.equals(response)) {
+		            // Performance work - need to close the XMLStreamReader to prevent GC thrashing.
+		            SOAPEnvelope env = msgToInvoke.getEnvelope();
+		            if(env!=null){
+		              StAXBuilder sb = (StAXBuilder)msgToInvoke.getEnvelope().getBuilder();
+		              if(sb!=null){
+		                sb.close();
+		              }
+		            }
+		        }
 
 			} catch (Exception e) {
 				if (log.isDebugEnabled())
@@ -105,7 +119,6 @@
 
 				handleFault(rmMsg, e);
 			}
-
 
 			transaction = storageManager.getTransaction();
 			 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Mon Sep 10 23:55:50 2007
@@ -131,8 +131,10 @@
 			}
 		}
 		
-		if (log.isDebugEnabled())
-			log.debug("Exit: SandeshaThread::stopRunning, " + this);
+	    // In a unit test, tracing 'this' once the thread was stopped caused
+	    // an exception, so we just trace exit.
+	    if (log.isDebugEnabled())
+	      log.debug("Exit: SandeshaThread::stopRunning");
 	}
 	
 	public synchronized boolean isThreadStarted() {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java Mon Sep 10 23:55:50 2007
@@ -200,12 +200,15 @@
 			SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
 			worker.setLock(getWorkerLock());
 			worker.setWorkId(workId);
-			threadPool.execute(worker);
-
-			// adding the workId to the lock after assigning it to a thread
-			// makes sure
-			// that all the workIds in the Lock are handled by threads.
-			getWorkerLock().addWork(workId);
+			
+			try {
+				// Set the lock up before we start the thread, but roll it back
+				// if we hit any problems
+				getWorkerLock().addWork(workId, worker);
+				threadPool.execute(worker);
+			} catch(Exception e) {
+				getWorkerLock().removeWork(workId);
+			}			
 
 			// If we got to here then we found work to do on the sequence, so we should
 			// remember not to sleep at the end of the list of sequences.

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java Mon Sep 10 23:55:50 2007
@@ -4,6 +4,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 
+import org.apache.axiom.om.impl.builder.StAXBuilder;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
@@ -76,6 +77,12 @@
 		
 		if (log.isDebugEnabled())
 			log.debug("Enter: SenderWorker::run");
+		
+	    // If we are not the holder of the correct lock, then we have to stop
+	    if(lock != null && !lock.ownsLock(workId, this)) {
+	      if (log.isDebugEnabled()) log.debug("Exit: SenderWorker::run, another worker holds the lock");
+	      return;
+	    }
 
 		Transaction transaction = null;
 		
@@ -253,8 +260,7 @@
 			try {
 				InvocationResponse response = InvocationResponse.CONTINUE;
 				
-				SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(msgCtx.getAxisOperation());
-				if(policy.isUseMessageSerialization()) {
+		        if(storageManager.requiresMessageSerialization()) {
 					if(msgCtx.isPaused()) {
 						if (log.isDebugEnabled())
 							log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
@@ -268,12 +274,6 @@
 						AxisEngine.send(msgCtx);  // TODO check if this should return an invocation response
 					}
 				} else {
-					// had to fully build the SOAP envelope to support
-					// retransmissions.
-					// Otherwise a 'parserAlreadyAccessed' exception could
-					// get thrown in retransmissions.
-					// But this has a performance reduction.
-					msgCtx.getEnvelope().build();
 	
 					ArrayList retransmittablePhases = (ArrayList) msgCtx.getProperty(Sandesha2Constants.RETRANSMITTABLE_PHASES);
 					if (retransmittablePhases!=null) {
@@ -289,6 +289,7 @@
 				
 					if (log.isDebugEnabled())
 						log.debug("Resuming a send for message : " + msgCtx.getEnvelope().getHeader());
+			        msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING, Boolean.FALSE);
 					response = AxisEngine.resumeSend(msgCtx);
 				}
 				if(log.isDebugEnabled()) log.debug("Engine resume returned " + response);
@@ -647,9 +648,21 @@
 				responseMessageContext.setSoapAction("");
 			}
 
+	        InvocationResponse response = null;
+	        
 			if (resenvelope!=null) {
-				AxisEngine.receive(responseMessageContext);
+				response = AxisEngine.receive(responseMessageContext);
 			}
+	        if(!InvocationResponse.SUSPEND.equals(response)) {
+	            // Performance work - need to close the XMLStreamReader to prevent GC thrashing.
+	            SOAPEnvelope env = responseMessageContext.getEnvelope();
+	            if(env!=null){
+	              StAXBuilder sb = (StAXBuilder)responseMessageContext.getEnvelope().getBuilder();
+	              if(sb!=null){
+	                sb.close();
+	              }            
+	            }
+	        }
 
 		} catch (Exception e) {
 			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noValidSyncResponse);

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java?rev=574490&r1=574489&r2=574490&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/WorkerLock.java Mon Sep 10 23:55:50 2007
@@ -1,25 +1,47 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
 package org.apache.sandesha2.workers;
 
-import java.util.ArrayList;
+import java.util.HashMap;
 
 public class WorkerLock {
 
-	public ArrayList workList = null;
+  private HashMap locks = new HashMap();
 	
 	public WorkerLock () {
-		workList = new ArrayList ();
+
 	}
 	
-	public synchronized void addWork (String work) {
-		workList.add(work);
-	}
+  public synchronized void addWork (String work, Object owner) {
+    if(locks.containsKey(work)) return;
+    locks.put(work, owner);
+  }
 	
 	public synchronized void removeWork (String work) {
-		workList.remove(work);
+    locks.remove(work);
 	}
 	
 	public synchronized boolean isWorkPresent (String work) {
-		return workList.contains(work);
+    return locks.containsKey(work);
 	}
+	
+	 public synchronized boolean ownsLock(String work, Object owner) {
+	    Object realOwner = locks.get(work);
+	    return realOwner == owner;
+	  }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org