You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/11/19 18:07:24 UTC

svn commit: r596367 - in /webservices/sandesha/trunk/java/modules: core/src/main/java/org/apache/sandesha2/msgprocessors/ core/src/main/java/org/apache/sandesha2/util/ core/src/main/java/org/apache/sandesha2/workers/ tests/src/test/java/org/apache/sand...

Author: mckierna
Date: Mon Nov 19 09:07:21 2007
New Revision: 596367

URL: http://svn.apache.org/viewvc?rev=596367&view=rev
Log: (empty)

Modified:
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
    webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
    webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Nov 19 09:07:21 2007
@@ -313,7 +313,7 @@
 		ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (getRMVersion()));
 		ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (getRMVersion()));
 
-		sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
+		sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0, null);
 		
 		// Pause the message context
 		ackRequestRMMsg.pause();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Nov 19 09:07:21 2007
@@ -43,6 +43,7 @@
 import org.apache.sandesha2.security.SecurityToken;
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
@@ -67,6 +68,7 @@
 
 	private String inboundSequence = null;
 	private long   inboundMessageNumber;
+	private Transaction appMsgProcTran = null;
 	
 	public ApplicationMsgProcessor() {
 		// Nothing to do
@@ -89,6 +91,7 @@
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
 
+		appMsgProcTran= tran;
 		MessageContext msgContext = rmMsgCtx.getMessageContext();
 		ConfigurationContext configContext = msgContext.getConfigurationContext();
 		
@@ -244,6 +247,7 @@
 		
 		String outSequenceID = null;
 
+
 		// Work out if there is a user transaction involved before updating any store state
 		// to give any storage manager interface a chance to setup any transactional state
 		boolean hasUserTransaction = storageManager.hasUserTransaction(msgContext);
@@ -264,16 +268,18 @@
 					if (rmsBean == null) {
 						rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
 						rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
-	
-						if (rmsBean == null && tran != null && tran.isActive()) {
+						if(rmsBean != null) outSequenceID = rmsBean.getSequenceID();
+						
+						if (rmsBean == null && appMsgProcTran != null && appMsgProcTran.isActive()) {
 							// Rollback the current locks.
-							tran.rollback();
+							appMsgProcTran.rollback();
 	
 							// Create a new tran.  This avoids a potential deadlock where the RMS/RMDBeans
 							// are taken in reverse order.
-							tran = storageManager.getTransaction();
+							appMsgProcTran = storageManager.getTransaction();
 						}
 					}
+
 				}
 	
 			} else {
@@ -435,15 +441,17 @@
 			//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
 			msgContext.pause();
 			
-			if (tran != null && tran.isActive()) {
-				tran.commit();
-				tran = null;
+			if (appMsgProcTran != null && appMsgProcTran.isActive()) {
+				appMsgProcTran.commit();
+				appMsgProcTran = null;
 			}
 		}
+
 		finally {
-			if (tran != null && tran.isActive())
-				tran.rollback();
+			if (appMsgProcTran != null && appMsgProcTran.isActive())
+				appMsgProcTran.rollback();
 		}
+		
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
 		return true;
@@ -458,7 +466,7 @@
 		MessageContext applicationMsg = applicationRMMsg.getMessageContext();
 		ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
 
-		// generating a new create sequeuce message.
+		// generating a new create sequence message.
 		RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(rmsBean, applicationRMMsg);
 
 		createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
@@ -474,7 +482,7 @@
 
 		MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
 		createSeqMsg.setRelationships(null); // create seq msg does not
-												// relateTo anything
+											 // relateTo anything
 		
 		String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key that will be used to store 
 																	   //the create sequence message.
@@ -517,20 +525,76 @@
 			SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
 	
 			storageManager.getSenderBeanMgr().insert(createSeqEntry);
-	
+			
+			if(appMsgProcTran != null && createSeqRMMessage.getMessageId() != null && !storageManager.hasUserTransaction(createSeqMsg)) {
+
+				// Lock the sender bean before we insert it, if we are planning to send it ourselves
+				String workId = createSeqEntry.getMessageID() + createSeqEntry.getTimeToSend();
+				SandeshaThread sender = storageManager.getSender();
+
+				ConfigurationContext context = createSeqMsg.getConfigurationContext();
+				WorkerLock lock = sender.getWorkerLock();
+		
+				SenderWorker worker = new SenderWorker(context, createSeqEntry, rmsBean.getRMVersion());
+				worker.setLock(lock);
+				worker.setWorkId(workId);
+				// Actually take the lock
+				lock.addWork(workId, worker);
+			  
+				// Commit the transaction, so that the sender worker starts with a clean slate.
+				if(appMsgProcTran.isActive()) appMsgProcTran.commit();				
+						
+				if(worker != null) {
+					try {
+						worker.run();
+					} catch(Exception e)  {
+					log.error("Caught exception running SandeshaWorker", e);
+					}
+				}
+		
+				//Create transaction
+				appMsgProcTran = storageManager.getTransaction();
+			
+				//Find RMSBean
+				RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+				RMSBean tempRMSBean = new RMSBean();
+				tempRMSBean.setInternalSequenceID(rmsBean.getInternalSequenceID());
+				rmsBean = rmsBeanMgr.findUnique(tempRMSBean);
+			
+				// If the RMSBean has been terminated this means that we may 
+				// well have encountered a problem sending this message
+				if (rmsBean == null || rmsBean.isTerminated()){
+					
+					if (log.isDebugEnabled())
+						log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, Failed to establish sequence " + rmsBean);
+					
+					if (rmsBean != null && rmsBean.getLastSendError() != null) {
+						if (rmsBean.getLastSendError() instanceof AxisFault)
+							throw (AxisFault)rmsBean.getLastSendError();
+					}
+					if (rmsBean.getLastSendError() != null)
+						throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused), 
+								rmsBean.getLastSendError());
+					
+					throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
+						
+				}
+			}
 			// Setup enough of the workers to get this create sequence off the box.
 			SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
 		} else {
 			rmsBean = null;
 		}
-		
+				
 		if (log.isDebugEnabled())
 			log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+		
 		return rmsBean;
 	}
 
 	private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
 		    String storageKey, StorageManager storageManager, Transaction tran, boolean hasUserTransaction) throws AxisFault {
+
 		if (log.isDebugEnabled())
 			log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
 
@@ -623,7 +687,7 @@
 		}
 		
 		// Commit the transaction, so that the sender worker starts with a clean slate.
-		if(tran != null && tran.isActive()) tran.commit();
+		if(appMsgProcTran != null && appMsgProcTran.isActive()) appMsgProcTran.commit();
 		 
 		if(worker != null) {
 		  try {

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Nov 19 09:07:21 2007
@@ -179,7 +179,7 @@
 		rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (getRMVersion()));
 
 		// Send this outgoing message
-		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
+		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0, transaction);
 
 		// Pause the message context
 		rmMsgCtx.pause();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Nov 19 09:07:21 2007
@@ -44,6 +44,7 @@
 import org.apache.sandesha2.storage.StorageManager;
 import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
@@ -56,6 +57,9 @@
 import org.apache.sandesha2.util.SpecSpecificConstants;
 import org.apache.sandesha2.util.TerminateManager;
 import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
 import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
 import org.apache.sandesha2.wsrm.TerminateSequence;
 
@@ -399,8 +403,8 @@
 		// Set a retransmitter lastSentTime so that terminate will be send with
 		// some delay.
 		// Otherwise this get send before return of the current request (ack).
-		// TODO: refine the terminate delay.
-		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY);		
+		// TODO: refine the terminate delay.		
+		sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY, transaction);		
 
 		// Pause the message context
 		rmMsgCtx.pause();

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Mon Nov 19 09:07:21 2007
@@ -771,17 +771,14 @@
 			return;			
 		}
 		
-	/*	if (rmsBean.getLastSendError() == null) {
-			// Indicate that there was an error when sending the Create Sequence.
-			rmsBean.setLastSendError(fault);
+		// Indicate that there was an error when sending the Create Sequence.
+		rmsBean.setLastSendError(fault);
+		// Mark the sequence as terminated
+		rmsBean.setTerminated(true);
 			
-			// Update the RMSBean
-			rmsBeanMgr.update(rmsBean);
-			if (log.isDebugEnabled())
-				log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Allowing another CreateSequence attempt");
-			return;
-		}
-*/
+		// Update the RMSBean
+		rmsBeanMgr.update(rmsBean);
+
 		SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
 		if (createSequenceSenderBean == null)
 			throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Mon Nov 19 09:07:21 2007
@@ -34,9 +34,13 @@
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
 import org.apache.sandesha2.i18n.SandeshaMessageKeys;
 import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
 import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
 
 public class WSRMMessageSender  {
 
@@ -117,7 +121,7 @@
   }
 	
 	
-	protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+	protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay, Transaction transaction) throws AxisFault {
 		if (log.isDebugEnabled())
 			log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
 		
@@ -154,6 +158,7 @@
 		{
 			senderBean.setSend(true);
 			senderBean.setSequenceID(outSequenceID);
+				
 		}
 		else
 			senderBean.setSend(false);			
@@ -173,10 +178,36 @@
 
 		SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
 		
-    SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
+		SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
 	
 		retramsmitterMgr.insert(senderBean);
-		
+	
+		if (sequenceExists && !storageManager.hasUserTransaction(msgContext)) {
+
+			String workId = msgContext.getMessageID()
+					+ senderBean.getTimeToSend();
+			SandeshaThread sender = storageManager.getSender();
+			WorkerLock lock = sender.getWorkerLock();
+
+			SenderWorker worker = new SenderWorker(configurationContext,
+					senderBean, rmsBean.getRMVersion());
+			worker.setLock(lock);
+			worker.setWorkId(workId);
+			// Actually take the lock
+			lock.addWork(workId, worker);
+
+			// Commit the transaction, so that the sender worker starts with a clean state
+			if (transaction != null && transaction.isActive())
+				transaction.commit();
+
+			if (worker != null) {
+				try {
+					worker.run();
+				} catch (Exception e) {
+					log.error("Caught exception running SandeshaWorker", e);
+				}
+			}
+		}
 		if (log.isDebugEnabled())
 			log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
 

Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Mon Nov 19 09:07:21 2007
@@ -47,7 +47,7 @@
 	private boolean stopRequested = false;
 	
 	private int sleepTime;
-  private WorkerLock lock = null;
+    private WorkerLock lock = null;
 
 	private ArrayList workingSequences = new ArrayList();
 	
@@ -56,9 +56,11 @@
 	protected StorageManager storageManager = null;
 	private boolean reRunThread;
 
+	
 	public SandeshaThread(int sleepTime) {
 		this.sleepTime = sleepTime;
-  	lock = new WorkerLock ();
+		this.setDaemon(true);
+  	    lock = new WorkerLock ();
 	}
 	
 	public final WorkerLock getWorkerLock() {
@@ -91,7 +93,7 @@
 				//ignore
 			}
 		}
-		
+				
 		//we can now request a pause - the next pause will be our
 		pauseRequired = true;
 				

Modified: webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java (original)
+++ webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java Mon Nov 19 09:07:21 2007
@@ -19,6 +19,7 @@
 import java.io.File;
 
 import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axis2.AxisFault;
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.addressing.EndpointReference;
@@ -99,7 +100,12 @@
 		serviceClient.setOptions(clientOptions);		
 		
 		TestCallback callback1 = new TestCallback ("Callback 1");
-		serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo1",sequenceKey),callback1);
+		boolean caughtException = false;
+		try {
+			serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo1",sequenceKey),callback1);
+		} catch (AxisFault e) {
+			caughtException = true;
+		}
         
 		long limit = System.currentTimeMillis() + waitTime;
 		Error lastError = null;
@@ -112,7 +118,9 @@
 				assertEquals(sequenceReport.getSequenceStatus(),SequenceReport.SEQUENCE_STATUS_TERMINATED);
 				assertEquals(sequenceReport.getSequenceDirection(),SequenceReport.SEQUENCE_DIRECTION_OUT);
 				
-				assertTrue(callback1.isErrorReported());
+				if (!caughtException)
+					assertTrue(callback1.isErrorReported());
+				
 				assertEquals(callback1.getResult(),null);
 				
 				lastError = null;
@@ -130,4 +138,6 @@
 	}
 	
 }
+
+
 



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org