You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by mc...@apache.org on 2007/11/19 18:07:24 UTC
svn commit: r596367 - in /webservices/sandesha/trunk/java/modules:
core/src/main/java/org/apache/sandesha2/msgprocessors/
core/src/main/java/org/apache/sandesha2/util/
core/src/main/java/org/apache/sandesha2/workers/
tests/src/test/java/org/apache/sand...
Author: mckierna
Date: Mon Nov 19 09:07:21 2007
New Revision: 596367
URL: http://svn.apache.org/viewvc?rev=596367&view=rev
Log: (empty)
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Mon Nov 19 09:07:21 2007
@@ -313,7 +313,7 @@
ackRequestRMMsg.setWSAAction(SpecSpecificConstants.getAckRequestAction (getRMVersion()));
ackRequestRMMsg.setSOAPAction(SpecSpecificConstants.getAckRequestSOAPAction (getRMVersion()));
- sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0);
+ sendOutgoingMessage(ackRequestRMMsg, Sandesha2Constants.MessageTypes.ACK_REQUEST, 0, null);
// Pause the message context
ackRequestRMMsg.pause();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java Mon Nov 19 09:07:21 2007
@@ -43,6 +43,7 @@
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
@@ -67,6 +68,7 @@
private String inboundSequence = null;
private long inboundMessageNumber;
+ private Transaction appMsgProcTran = null;
public ApplicationMsgProcessor() {
// Nothing to do
@@ -89,6 +91,7 @@
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processOutMessage");
+ appMsgProcTran= tran;
MessageContext msgContext = rmMsgCtx.getMessageContext();
ConfigurationContext configContext = msgContext.getConfigurationContext();
@@ -244,6 +247,7 @@
String outSequenceID = null;
+
// Work out if there is a user transaction involved before updating any store state
// to give any storage manager interface a chance to setup any transactional state
boolean hasUserTransaction = storageManager.hasUserTransaction(msgContext);
@@ -264,16 +268,18 @@
if (rmsBean == null) {
rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);
-
- if (rmsBean == null && tran != null && tran.isActive()) {
+ if(rmsBean != null) outSequenceID = rmsBean.getSequenceID();
+
+ if (rmsBean == null && appMsgProcTran != null && appMsgProcTran.isActive()) {
// Rollback the current locks.
- tran.rollback();
+ appMsgProcTran.rollback();
// Create a new tran. This avoids a potential deadlock where the RMS/RMDBeans
// are taken in reverse order.
- tran = storageManager.getTransaction();
+ appMsgProcTran = storageManager.getTransaction();
}
}
+
}
} else {
@@ -435,15 +441,17 @@
//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
msgContext.pause();
- if (tran != null && tran.isActive()) {
- tran.commit();
- tran = null;
+ if (appMsgProcTran != null && appMsgProcTran.isActive()) {
+ appMsgProcTran.commit();
+ appMsgProcTran = null;
}
}
+
finally {
- if (tran != null && tran.isActive())
- tran.rollback();
+ if (appMsgProcTran != null && appMsgProcTran.isActive())
+ appMsgProcTran.rollback();
}
+
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);
return true;
@@ -458,7 +466,7 @@
MessageContext applicationMsg = applicationRMMsg.getMessageContext();
ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
- // generating a new create sequeuce message.
+ // generating a new create sequence message.
RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(rmsBean, applicationRMMsg);
createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
@@ -474,7 +482,7 @@
MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
createSeqMsg.setRelationships(null); // create seq msg does not
- // relateTo anything
+ // relateTo anything
String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key that will be used to store
//the create sequence message.
@@ -517,20 +525,76 @@
SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
storageManager.getSenderBeanMgr().insert(createSeqEntry);
-
+
+ if(appMsgProcTran != null && createSeqRMMessage.getMessageId() != null && !storageManager.hasUserTransaction(createSeqMsg)) {
+
+ // Lock the sender bean before we insert it, if we are planning to send it ourselves
+ String workId = createSeqEntry.getMessageID() + createSeqEntry.getTimeToSend();
+ SandeshaThread sender = storageManager.getSender();
+
+ ConfigurationContext context = createSeqMsg.getConfigurationContext();
+ WorkerLock lock = sender.getWorkerLock();
+
+ SenderWorker worker = new SenderWorker(context, createSeqEntry, rmsBean.getRMVersion());
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+ // Actually take the lock
+ lock.addWork(workId, worker);
+
+ // Commit the transaction, so that the sender worker starts with a clean slate.
+ if(appMsgProcTran.isActive()) appMsgProcTran.commit();
+
+ if(worker != null) {
+ try {
+ worker.run();
+ } catch(Exception e) {
+ log.error("Caught exception running SandeshaWorker", e);
+ }
+ }
+
+ //Create transaction
+ appMsgProcTran = storageManager.getTransaction();
+
+ //Find RMSBean
+ RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
+ RMSBean tempRMSBean = new RMSBean();
+ tempRMSBean.setInternalSequenceID(rmsBean.getInternalSequenceID());
+ rmsBean = rmsBeanMgr.findUnique(tempRMSBean);
+
+ // If the RMSBean has been terminated this means that we may
+ // well have encountered a problem sending this message
+ if (rmsBean == null || rmsBean.isTerminated()){
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, Failed to establish sequence " + rmsBean);
+
+ if (rmsBean != null && rmsBean.getLastSendError() != null) {
+ if (rmsBean.getLastSendError() instanceof AxisFault)
+ throw (AxisFault)rmsBean.getLastSendError();
+ }
+ if (rmsBean.getLastSendError() != null)
+ throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused),
+ rmsBean.getLastSendError());
+
+ throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
+
+ }
+ }
// Setup enough of the workers to get this create sequence off the box.
SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
} else {
rmsBean = null;
}
-
+
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
+
return rmsBean;
}
private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
String storageKey, StorageManager storageManager, Transaction tran, boolean hasUserTransaction) throws AxisFault {
+
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
@@ -623,7 +687,7 @@
}
// Commit the transaction, so that the sender worker starts with a clean slate.
- if(tran != null && tran.isActive()) tran.commit();
+ if(appMsgProcTran != null && appMsgProcTran.isActive()) appMsgProcTran.commit();
if(worker != null) {
try {
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/CloseSequenceProcessor.java Mon Nov 19 09:07:21 2007
@@ -179,7 +179,7 @@
rmMsgCtx.setSOAPAction(SpecSpecificConstants.getCloseSequenceAction (getRMVersion()));
// Send this outgoing message
- sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0);
+ sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.CLOSE_SEQUENCE, 0, transaction);
// Pause the message context
rmMsgCtx.pause();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java Mon Nov 19 09:07:21 2007
@@ -44,6 +44,7 @@
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
+import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beans.RMDBean;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
@@ -56,6 +57,9 @@
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
import org.apache.sandesha2.util.WSRMMessageSender;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
import org.apache.sandesha2.wsrm.SequenceAcknowledgement;
import org.apache.sandesha2.wsrm.TerminateSequence;
@@ -399,8 +403,8 @@
// Set a retransmitter lastSentTime so that terminate will be send with
// some delay.
// Otherwise this get send before return of the current request (ack).
- // TODO: refine the terminate delay.
- sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY);
+ // TODO: refine the terminate delay.
+ sendOutgoingMessage(rmMsgCtx, Sandesha2Constants.MessageTypes.TERMINATE_SEQ, Sandesha2Constants.TERMINATE_DELAY, transaction);
// Pause the message context
rmMsgCtx.pause();
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/FaultManager.java Mon Nov 19 09:07:21 2007
@@ -771,17 +771,14 @@
return;
}
- /* if (rmsBean.getLastSendError() == null) {
- // Indicate that there was an error when sending the Create Sequence.
- rmsBean.setLastSendError(fault);
+ // Indicate that there was an error when sending the Create Sequence.
+ rmsBean.setLastSendError(fault);
+ // Mark the sequence as terminated
+ rmsBean.setTerminated(true);
- // Update the RMSBean
- rmsBeanMgr.update(rmsBean);
- if (log.isDebugEnabled())
- log.debug("Exit: FaultManager::processCreateSequenceRefusedFault Allowing another CreateSequence attempt");
- return;
- }
-*/
+ // Update the RMSBean
+ rmsBeanMgr.update(rmsBean);
+
SenderBean createSequenceSenderBean = retransmitterMgr.retrieve(createSeqMsgId);
if (createSequenceSenderBean == null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSeqEntryNotFound));
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java Mon Nov 19 09:07:21 2007
@@ -34,9 +34,13 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.SenderWorker;
+import org.apache.sandesha2.workers.WorkerLock;
public class WSRMMessageSender {
@@ -117,7 +121,7 @@
}
- protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay) throws AxisFault {
+ protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
@@ -154,6 +158,7 @@
{
senderBean.setSend(true);
senderBean.setSequenceID(outSequenceID);
+
}
else
senderBean.setSend(false);
@@ -173,10 +178,36 @@
SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
- SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
+ SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
retramsmitterMgr.insert(senderBean);
-
+
+ if (sequenceExists && !storageManager.hasUserTransaction(msgContext)) {
+
+ String workId = msgContext.getMessageID()
+ + senderBean.getTimeToSend();
+ SandeshaThread sender = storageManager.getSender();
+ WorkerLock lock = sender.getWorkerLock();
+
+ SenderWorker worker = new SenderWorker(configurationContext,
+ senderBean, rmsBean.getRMVersion());
+ worker.setLock(lock);
+ worker.setWorkId(workId);
+ // Actually take the lock
+ lock.addWork(workId, worker);
+
+ // Commit the transaction, so that the sender worker starts with a clean state
+ if (transaction != null && transaction.isActive())
+ transaction.commit();
+
+ if (worker != null) {
+ try {
+ worker.run();
+ } catch (Exception e) {
+ log.error("Caught exception running SandeshaWorker", e);
+ }
+ }
+ }
if (log.isDebugEnabled())
log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
Modified: webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SandeshaThread.java Mon Nov 19 09:07:21 2007
@@ -47,7 +47,7 @@
private boolean stopRequested = false;
private int sleepTime;
- private WorkerLock lock = null;
+ private WorkerLock lock = null;
private ArrayList workingSequences = new ArrayList();
@@ -56,9 +56,11 @@
protected StorageManager storageManager = null;
private boolean reRunThread;
+
public SandeshaThread(int sleepTime) {
this.sleepTime = sleepTime;
- lock = new WorkerLock ();
+ this.setDaemon(true);
+ lock = new WorkerLock ();
}
public final WorkerLock getWorkerLock() {
@@ -91,7 +93,7 @@
//ignore
}
}
-
+
//we can now request a pause - the next pause will be our
pauseRequired = true;
Modified: webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java?rev=596367&r1=596366&r2=596367&view=diff
==============================================================================
--- webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java (original)
+++ webservices/sandesha/trunk/java/modules/tests/src/test/java/org/apache/sandesha2/faulttests/CreateSequenceRefusedInboundFaultTest.java Mon Nov 19 09:07:21 2007
@@ -19,6 +19,7 @@
import java.io.File;
import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.addressing.EndpointReference;
@@ -99,7 +100,12 @@
serviceClient.setOptions(clientOptions);
TestCallback callback1 = new TestCallback ("Callback 1");
- serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo1",sequenceKey),callback1);
+ boolean caughtException = false;
+ try {
+ serviceClient.sendReceiveNonBlocking (getEchoOMBlock("echo1",sequenceKey),callback1);
+ } catch (AxisFault e) {
+ caughtException = true;
+ }
long limit = System.currentTimeMillis() + waitTime;
Error lastError = null;
@@ -112,7 +118,9 @@
assertEquals(sequenceReport.getSequenceStatus(),SequenceReport.SEQUENCE_STATUS_TERMINATED);
assertEquals(sequenceReport.getSequenceDirection(),SequenceReport.SEQUENCE_DIRECTION_OUT);
- assertTrue(callback1.isErrorReported());
+ if (!caughtException)
+ assertTrue(callback1.isErrorReported());
+
assertEquals(callback1.getResult(),null);
lastError = null;
@@ -130,4 +138,6 @@
}
}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org