You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2007/02/06 13:57:04 UTC
svn commit: r504115 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2: msgprocessors/
polling/ storage/ storage/inmemory/ util/ workers/
Author: mlovett
Date: Tue Feb 6 04:57:01 2007
New Revision: 504115
URL: http://svn.apache.org/viewvc?view=rev&rev=504115
Log:
Make the PollingManager more similar to the other worker threads
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java Tue Feb 6 04:57:01 2007
@@ -34,7 +34,6 @@
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.engine.AxisEngine;
-import org.apache.axis2.engine.Handler.InvocationResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/AcknowledgementProcessor.java Tue Feb 6 04:57:01 2007
@@ -37,6 +37,7 @@
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.polling.PollingManager;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
@@ -214,9 +215,10 @@
RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
RMDBean rMDBean = rMDBeanMgr.retrieve(outSequenceId);
- if (rMDBean!=null && rMDBean.isPollingMode())
- SandeshaUtil.shedulePollingRequest(rmsBean.getOfferedSequence(), configCtx);
-
+ if (rMDBean!=null && rMDBean.isPollingMode()) {
+ PollingManager manager = storageManager.getPollingManager();
+ manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ }
}
// We overwrite the previous client completed message ranges with the
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqMsgProcessor.java Tue Feb 6 04:57:01 2007
@@ -154,7 +154,7 @@
rMSBean.setCreateSeqMsgID(SandeshaUtil.getUUID());
rMSBean.setToEPR(rmdBean.getToEPR());
- rMSBean.setAcksToEPR(rmdBean.getAcksToEPR());
+ rMSBean.setAcksToEPR(rmdBean.getToEPR()); // The acks need to flow back into this endpoint
rMSBean.setReplyToEPR(rmdBean.getReplyToEPR());
rMSBean.setLastActivatedTime(System.currentTimeMillis());
rMSBean.setRMVersion(rmdBean.getRMVersion());
@@ -175,7 +175,22 @@
// Store the inbound token (if any) with the new sequence
rMSBean.setSecurityTokenData(rmdBean.getSecurityTokenData());
+ // If this new sequence has anonymous acksTo, then we must poll for the acks
+ // If the inbound sequence is targetted at the WSRM anonymous URI, we need to start
+ // polling for this sequence.
+ String acksTo = rMSBean.getAcksToEPR();
+ EndpointReference reference = new EndpointReference(acksTo);
+ if ((acksTo == null || reference.hasAnonymousAddress()) &&
+ Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+ rMSBean.setPollingMode(true);
+ }
+
rmsBeanMgr.insert(rMSBean);
+
+ if(rMSBean.isPollingMode()) {
+ SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(), false);
+ }
+
} else {
// removing the accept part.
createSeqResPart.setAccept(null);
@@ -188,8 +203,22 @@
outMessage.setResponseWritten(true);
rmdBean.setLastActivatedTime(System.currentTimeMillis());
+
+ // If the inbound sequence is targetted at the anonymous URI, we need to start
+ // polling for this sequence.
+ EndpointReference toEPR = createSeqMsg.getTo();
+ if (toEPR.hasAnonymousAddress()) {
+ if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqRMMsg.getRMSpecVersion())) {
+ rmdBean.setPollingMode(true);
+ }
+ }
+
storageManager.getRMDBeanMgr().update(rmdBean);
+ if(rmdBean.isPollingMode()) {
+ SandeshaUtil.startPollingForTheSequence(context, rmdBean.getSequenceID(), false);
+ }
+
AxisEngine engine = new AxisEngine(context);
try{
engine.send(outMessage);
@@ -204,27 +233,22 @@
return false;
}
- boolean anon = true;
- if (rmdBean.getToEPR() != null) {
- EndpointReference toEPR = new EndpointReference(rmdBean.getToEPR());
- if (!toEPR.hasAnonymousAddress()) anon = false;
- }
- if(anon) {
+ EndpointReference replyTo = createSeqMsg.getReplyTo();
+ if(replyTo == null || replyTo.hasAnonymousAddress()) {
createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "true");
} else {
- createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
+ createSeqMsg.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN, "false");
}
// SequencePropertyBean findBean = new SequencePropertyBean ();
// findBean.setName (Sandesha2Constants.SequenceProperties.TERMINATE_ON_CREATE_SEQUENCE);
// findBean.setValue(createSeqMsg.getTo().getAddress());
- String toAddress = createSeqMsg.getTo().getAddress();
//if toAddress is RMAnon we may need to terminate the request side sequence here.
- if (toAddress!=null && SandeshaUtil.isWSRMAnonymous(toAddress)) {
+ if (toEPR.hasAnonymousAddress()) {
RMSBean findBean = new RMSBean ();
- findBean.setReplyToEPR(toAddress);
+ findBean.setReplyToEPR(toEPR.getAddress());
findBean.setTerminationPauserForCS(true);
//TODO recheck
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/CreateSeqResponseMsgProcessor.java Tue Feb 6 04:57:01 2007
@@ -133,10 +133,10 @@
// the create must include an offer (or this client cannot be identified). If the reply-to
// is the RM anon URI template then the offer is not required.
if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
- String replyToAddress = rmsBean.getReplyToEPR();
- if(SandeshaUtil.isWSRMAnonymous(replyToAddress)) {
+ String acksTo = rmsBean.getAcksToEPR();
+ EndpointReference reference = new EndpointReference(acksTo);
+ if(acksTo == null || reference.hasAnonymousAddress()) {
rmsBean.setPollingMode(true);
- SandeshaUtil.startPollingManager(configCtx);
}
}
@@ -181,11 +181,8 @@
// rmdBean for polling too, so that it still gets serviced after the outbound
// sequence terminates.
if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(createSeqResponseRMMsgCtx.getRMSpecVersion())) {
- String replyToAddress = rmsBean.getReplyToEPR();
- EndpointReference ref = new EndpointReference(replyToAddress);
- if(rmsBean.isPollingMode() && (replyToAddress == null || ref.hasAnonymousAddress())) {
+ if(rmsBean.isPollingMode()) {
rMDBean.setPollingMode(true);
- SandeshaUtil.startPollingManager(configCtx);
}
}
@@ -213,10 +210,18 @@
rMDBean.setSecurityTokenData(rmsBean.getSecurityTokenData());
rmdBeanMgr.insert(rMDBean);
+
+ if(rMDBean.isPollingMode()) {
+ SandeshaUtil.startPollingForTheSequence(configCtx, rMDBean.getSequenceID(), false);
+ }
}
rmsBean.setLastActivatedTime(System.currentTimeMillis());
rmsBeanMgr.update(rmsBean);
+
+ if(rmsBean.isPollingMode()) {
+ SandeshaUtil.startPollingForTheSequence(configCtx, rmsBean.getSequenceID(), true);
+ }
// Locate and update all of the messages for this sequence, now that we know
// the sequence id.
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/MessagePendingProcessor.java Tue Feb 6 04:57:01 2007
@@ -1,12 +1,14 @@
package org.apache.sandesha2.msgprocessors;
import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.polling.PollingManager;
+import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.MessagePending;
@@ -33,10 +35,10 @@
if (messagePending!=null) {
boolean pending = messagePending.isPending();
if (pending) {
- PollingManager pollingManager = SandeshaUtil.getPollingManager(message.getConfigurationContext());
- if (pollingManager!=null) {
- pollingManager.shedulePollingRequest(sequenceId);
- }
+ ConfigurationContext context = rmMsgContext.getConfigurationContext();
+ StorageManager storage = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+ PollingManager pollingManager = storage.getPollingManager();
+ pollingManager.schedulePollingRequest(sequenceId, false);
}
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/msgprocessors/TerminateSeqResponseMsgProcessor.java Tue Feb 6 04:57:01 2007
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
+import org.apache.sandesha2.polling.PollingManager;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
@@ -67,16 +68,15 @@
msgContext.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,rmsBean.getInternalSequenceID());
- ConfigurationContext configContext = msgContext.getConfigurationContext();
-
//shedulling a polling request for the response side.
-
if (rmsBean.getOfferedSequence()!=null) {
RMDBeanMgr rMDBeanMgr = storageManager.getRMDBeanMgr();
RMDBean rMDBean = rMDBeanMgr.retrieve(sequenceId);
- if (rMDBean!=null && rMDBean.isPollingMode())
- SandeshaUtil.shedulePollingRequest(rmsBean.getOfferedSequence(), configContext);
+ if (rMDBean!=null && rMDBean.isPollingMode()) {
+ PollingManager manager = storageManager.getPollingManager();
+ manager.schedulePollingRequest(rMDBean.getSequenceID(), false);
+ }
}
TerminateManager.terminateSendingSide (rmsBean, msgContext.isServerSide(), storageManager);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/polling/PollingManager.java Tue Feb 6 04:57:01 2007
@@ -17,12 +17,11 @@
package org.apache.sandesha2.polling;
-import java.util.HashMap;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.LinkedList;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,7 +29,6 @@
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.storage.SandeshaStorageException;
-import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
@@ -42,118 +40,116 @@
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.RMMsgCreator;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.SandeshaThread;
/**
* This class is responsible for sending MakeConnection requests. This is a seperate thread that
* keeps running. Will do MakeConnection based on the request queue or randomly.
*/
-public class PollingManager extends Thread {
+public class PollingManager extends SandeshaThread {
private static final Log log = LogFactory.getLog(PollingManager.class);
- private ConfigurationContext configurationContext = null;
- private StorageManager storageManager = null;
- private boolean poll = false;
-
// Variables used to help round-robin across the sequences that we can poll for
- private int rmsIndex = 0;
- private int rmdIndex = 0;
+ private int nextIndex = 0;
/**
* By adding an entry to this, the PollingManager will be asked to do a polling request on this sequence.
*/
- private HashMap sheduledPollingRequests = null;
+ private LinkedList scheduledPollingRequests = new LinkedList();
- private final int POLLING_MANAGER_WAIT_TIME = 3000;
+ private static final int POLLING_MANAGER_WAIT_TIME = 3000;
- private void internalRun() {
- while (isPoll()) {
- Transaction t = null;
- try {
- t = storageManager.getTransaction();
- pollRMDSide();
- if(t != null) t.commit();
- t = null;
-
- t = storageManager.getTransaction();
- pollRMSSide();
- if(t != null) t.commit();
- t = null;
- } catch (Exception e) {
- if(log.isDebugEnabled()) log.debug("Exception", e);
- if(t != null) {
- try {
- t.rollback();
- } catch(Exception e2) {
- if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
- }
- }
- }
- try {
- Thread.sleep(POLLING_MANAGER_WAIT_TIME);
- } catch (InterruptedException e) {
- if(log.isDebugEnabled()) log.debug("Sleep was interrupted", e);
- }
- }
+ public PollingManager() {
+ super(POLLING_MANAGER_WAIT_TIME);
}
- public void run() {
+ protected boolean internalRun() {
+ if(log.isDebugEnabled()) log.debug("Enter: PollingManager::internalRun");
+ Transaction t = null;
try {
- internalRun();
- } catch(Exception e) {
- if(log.isDebugEnabled()) log.debug("PollingManager thread ending", e);
+ // If we have request scheduled, handle them first, and then pick
+ // pick a sequence using a round-robin approach.
+ SequenceEntry entry = null;
+ synchronized (this) {
+ if(!scheduledPollingRequests.isEmpty()) {
+ entry = (SequenceEntry) scheduledPollingRequests.removeFirst();
+ }
+ }
+ if(entry == null) {
+ ArrayList allSequencesList = getSequences();
+ int size = allSequencesList.size();
+ if(log.isDebugEnabled()) log.debug("Choosing one from " + size + " sequences");
+ if(nextIndex >= size) {
+ nextIndex = 0;
+ // We just looped over the set of sequences, so sleep before we try
+ // polling them again.
+ if (log.isDebugEnabled()) log.debug("Exit: PollingManager::internalRun, looped over all sequences, sleeping");
+ return true;
+ }
+
+ entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+ }
+ if(log.isDebugEnabled()) log.debug("Chose sequence " + entry.getSequenceId());
+
+ t = storageManager.getTransaction();
+ if(entry.isRmSource()) {
+ pollRMSSide(entry);
+ } else {
+ pollRMDSide(entry);
+ }
+ if(t != null) t.commit();
+ t = null;
+
+ } catch (Exception e) {
+ if(log.isDebugEnabled()) log.debug("Exception", e);
+ if(t != null) {
+ try {
+ t.rollback();
+ } catch(Exception e2) {
+ if(log.isDebugEnabled()) log.debug("Exception during rollback", e);
+ }
+ }
}
+
+ if(log.isDebugEnabled()) log.debug("Exit: PollingManager::internalRun, not sleeping");
+ return false;
}
- private void pollRMSSide() throws AxisFault {
+ private void pollRMSSide(SequenceEntry entry) throws AxisFault {
if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMSSide");
RMSBeanMgr rmsBeanManager = storageManager.getRMSBeanMgr();
RMSBean findRMS = new RMSBean();
+ findRMS.setSequenceID(entry.getSequenceId());
findRMS.setPollingMode(true);
findRMS.setTerminated(false);
- List results = rmsBeanManager.find(findRMS);
+ RMSBean beanToPoll = rmsBeanManager.findUnique(findRMS);
- int size = results.size();
- log.debug("Choosing one from " + size + " RMS sequences");
- if(rmsIndex >= size) {
- rmsIndex = 0;
- if (size == 0) {
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide, nothing to poll");
- return;
- }
+ if(beanToPoll == null) {
+ // This sequence must have been terminated, or deleted
+ stopThreadForSequence(entry.getSequenceId(), true);
+ } else {
+ pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll);
}
- RMSBean beanToPoll = (RMSBean) results.get(rmsIndex++);
- pollForSequence(beanToPoll.getSequenceID(), beanToPoll.getInternalSequenceID(), beanToPoll.getReferenceMessageStoreKey(), beanToPoll);
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMSSide");
}
- private void pollRMDSide() throws AxisFault {
+ private void pollRMDSide(SequenceEntry entry) throws AxisFault {
if(log.isDebugEnabled()) log.debug("Enter: PollingManager::pollRMDSide");
- //geting the sequences to be polled.
- //if shedule contains any requests, do the earliest one.
- //else pick one randomly.
RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
- String sequenceId = getNextSheduleEntry ();
-
RMDBean findBean = new RMDBean();
findBean.setPollingMode(true);
findBean.setTerminated(false);
- findBean.setSequenceID(sequenceId); // Note that this may be null
- List results = nextMsgMgr.find(findBean);
-
- int size = results.size();
+ findBean.setSequenceID(entry.getSequenceId());
+ RMDBean nextMsgBean = nextMsgMgr.findUnique(findBean);
- log.debug("Choosing one from " + size + " RMD sequences");
- if(rmdIndex >= size) {
- rmdIndex = 0;
- if (size == 0) {
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide, nothing to poll");
- return;
- }
+ if(nextMsgBean == null) {
+ // This sequence must have been terminated, or deleted
+ stopThreadForSequence(entry.getSequenceId(), false);
+ } else {
+ pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
}
- RMDBean nextMsgBean = (RMDBean) results.get(rmdIndex++);
- pollForSequence(nextMsgBean.getSequenceID(), nextMsgBean.getSequenceID(), nextMsgBean.getReferenceMessageKey(), nextMsgBean);
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollRMDSide");
}
@@ -176,7 +172,7 @@
WSRMAnonReplyToURI = replyTo;
}
- MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,configurationContext);
+ MessageContext referenceMessage = storageManager.retrieveMessageContext(referenceMsgKey,context);
RMMsgContext referenceRMMessage = MsgInitializer.initializeMessage(referenceMessage);
RMMsgContext makeConnectionRMMessage = RMMsgCreator.createMakeConnectionMessage(referenceRMMessage,
rmBean, sequenceId, WSRMAnonReplyToURI, storageManager);
@@ -213,84 +209,18 @@
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::pollForSequence");
}
- private synchronized String getNextSheduleEntry () {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::getNextSheduleEntry");
- String sequenceId = null;
-
- if (sheduledPollingRequests.size()>0) {
- sequenceId = (String) sheduledPollingRequests.keySet().iterator().next();
- Integer sequencEntryCount = (Integer) sheduledPollingRequests.remove(sequenceId);
-
- Integer leftCount = new Integer (sequencEntryCount.intValue() -1 );
- if (leftCount.intValue() > 0)
- sheduledPollingRequests.put(sequenceId, leftCount);
- }
-
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::getNextSheduleEntry, " + sequenceId);
- return sequenceId;
- }
-
- /**
- * Starts the PollingManager.
- *
- * @param configurationContext
- * @throws SandeshaException
- */
- public synchronized void start (ConfigurationContext configurationContext) throws SandeshaException {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::start");
-
- this.configurationContext = configurationContext;
- this.sheduledPollingRequests = new HashMap ();
- this.storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
- setPoll(true);
- super.start();
-
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::start");
- }
-
- /**
- * Asks the PollingManager to stop its work.
- *
- */
- public synchronized void stopPolling () {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::stopPolling");
- setPoll(false);
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::stopPolling");
- }
-
- public synchronized void setPoll (boolean poll) {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::setPoll");
- this.poll = poll;
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::setPoll");
- }
-
- public synchronized boolean isPoll () {
- if(log.isDebugEnabled()) log.debug("Enter: PollingManager::isPoll");
- if(log.isDebugEnabled()) log.debug("Exit: PollingManager::isPoll");
- return poll;
- }
-
- public void start () {
- throw new UnsupportedOperationException ("You must use the oveerloaded start method");
- }
-
/**
* Asking the polling manager to do a polling request on the sequence identified by the
* given InternalSequenceId.
*
* @param sequenceId
*/
- public synchronized void shedulePollingRequest (String sequenceId) {
+ public synchronized void schedulePollingRequest(String sequenceId, boolean rmSource) {
if(log.isDebugEnabled()) log.debug("Enter: PollingManager::shedulePollingRequest, " + sequenceId);
- if (sheduledPollingRequests.containsKey (sequenceId)) {
- Integer sequenceEntryCount = (Integer) sheduledPollingRequests.get(sequenceId);
- Integer newCount = new Integer (sequenceEntryCount.intValue()+1);
- sheduledPollingRequests.put(sequenceId,newCount);
- } else {
- Integer sequenceEntryCount = new Integer (1);
- sheduledPollingRequests.put(sequenceId, sequenceEntryCount);
- }
+ SequenceEntry entry = new SequenceEntry(sequenceId, rmSource);
+ scheduledPollingRequests.add(entry);
+ this.wakeThread();
if(log.isDebugEnabled()) log.debug("Exit: PollingManager::shedulePollingRequest");
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java Tue Feb 6 04:57:01 2007
@@ -20,6 +20,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisModule;
+import org.apache.sandesha2.polling.PollingManager;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
@@ -52,6 +53,7 @@
//shutdown the running threads
getSender().stopRunning();
getInvoker().stopRunning();
+ getPollingManager().stopRunning();
}
public abstract void initStorage (AxisModule moduleDesc) throws SandeshaStorageException;
@@ -61,6 +63,8 @@
public abstract SandeshaThread getSender();
public abstract SandeshaThread getInvoker();
+
+ public abstract PollingManager getPollingManager();
public abstract RMSBeanMgr getRMSBeanMgr();
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Tue Feb 6 04:57:01 2007
@@ -38,6 +38,7 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
+import org.apache.sandesha2.polling.PollingManager;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
@@ -63,6 +64,7 @@
private InvokerBeanMgr invokerBeanMgr = null;
private Sender sender = null;
private Invoker invoker = null;
+ private PollingManager pollingManager = null;
private HashMap transactions = new HashMap();
private boolean useSerialization = false;
@@ -80,7 +82,7 @@
this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
this.sender = new Sender();
this.invoker = new Invoker();
-
+ this.pollingManager = new PollingManager();
}
public Transaction getTransaction() {
@@ -128,14 +130,21 @@
*/
public SandeshaThread getInvoker() {
return invoker;
- }
+ }
/**
* Gets the Sender for this Storage manager
*/
public SandeshaThread getSender() {
return sender;
- }
+ }
+
+ /**
+ * Gets the PollingManager for this Storage manager
+ */
+ public PollingManager getPollingManager() {
+ return pollingManager;
+ }
void enlistBean(RMBean bean) throws SandeshaStorageException {
InMemoryTransaction t = null;
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java Tue Feb 6 04:57:01 2007
@@ -83,12 +83,13 @@
set.add(this);
while(other != null) {
if(set.contains(other)) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
+ SandeshaStorageException e = new SandeshaStorageException(message);
+
// Do our best to get out of the way of the other work in the system
waitingForTran = null;
releaseLocks();
-
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.deadlock, this.toString(), bean.toString());
- SandeshaStorageException e = new SandeshaStorageException(message);
+
if(log.isDebugEnabled()) log.debug(message, e);
throw e;
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Tue Feb 6 04:57:01 2007
@@ -71,7 +71,6 @@
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.policy.SandeshaPolicyBean;
-import org.apache.sandesha2.polling.PollingManager;
import org.apache.sandesha2.security.SecurityManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
@@ -148,7 +147,7 @@
log.debug("Enter: SandeshaUtil::startSenderForTheSequence , context " + context + ", sequenceID " + sequenceID);
SandeshaThread sender = getSandeshaStorageManager(context, context.getAxisConfiguration()).getSender();
- sender.runThreadForSequence(context, sequenceID);
+ sender.runThreadForSequence(context, sequenceID, true);
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::startSenderForTheSequence");
@@ -159,13 +158,16 @@
log.debug("Enter: SandeshaUtil::startInvokerForTheSequence , context " + context + ", sequenceID " + sequenceID);
SandeshaThread invoker = getSandeshaStorageManager(context, context.getAxisConfiguration()).getInvoker();
- invoker.runThreadForSequence(context,sequenceID);
+ invoker.runThreadForSequence(context, sequenceID, false);
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::startInvokerForTheSequence");
}
- public static void startPollingManager (ConfigurationContext configurationContext) throws SandeshaException {
+ public static void startPollingForTheSequence(ConfigurationContext configurationContext, String sequenceID, boolean rmSource) throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaUtil::startPollingForTheSequence , context " + configurationContext + ", sequenceID " + sequenceID + ", rmSource");
+
// Only start the polling manager if we are configured to use MakeConnection
SandeshaPolicyBean policy = getPropertyBean(configurationContext.getAxisConfiguration());
if(!policy.isEnableMakeConnection()) {
@@ -173,22 +175,11 @@
throw new SandeshaException(message);
}
- PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
- Sandesha2Constants.POLLING_MANAGER);
-
- //assums that if somebody hs set the PollingManager, he must hv already started it.
- if (pollingManager==null) {
- pollingManager = new PollingManager ();
- configurationContext.setProperty(Sandesha2Constants.POLLING_MANAGER,pollingManager);
- pollingManager.start(configurationContext);
- }
- }
-
- public static void stopPollingManager (ConfigurationContext configurationContext) {
- PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
- Sandesha2Constants.POLLING_MANAGER);
- if (pollingManager!=null)
- pollingManager.stopPolling ();
+ SandeshaThread polling = getSandeshaStorageManager(configurationContext, configurationContext.getAxisConfiguration()).getPollingManager();
+ polling.runThreadForSequence(configurationContext, sequenceID, rmSource);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaUtil::startPollingForTheSequence");
}
public static String getMessageTypeString(int messageType) {
@@ -1079,18 +1070,6 @@
return newMsg;
- }
-
- public static PollingManager getPollingManager (ConfigurationContext configurationContext) {
- PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
- Sandesha2Constants.POLLING_MANAGER);
-
- return pollingManager;
- }
-
- public static void shedulePollingRequest (String sequenceId, ConfigurationContext configurationContext) {
- PollingManager pollingManager = getPollingManager(configurationContext);
- pollingManager.shedulePollingRequest(sequenceId);
}
public static EndpointReference cloneEPR (EndpointReference epr) {
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Tue Feb 6 04:57:01 2007
@@ -47,7 +47,12 @@
public class Invoker extends SandeshaThread {
private static final Log log = LogFactory.getLog(Invoker.class);
-
+
+ // If this invoker is working for several sequences, we use round-robin to
+ // try and give them all a chance to invoke messages.
+ int nextIndex = 0;
+ boolean processedMessage = false;
+
public Invoker() {
super(Sandesha2Constants.INVOKER_SLEEP_TIME);
}
@@ -69,9 +74,6 @@
blockForPause();
try{
//get all invoker beans for the sequence
- StorageManager storageManager =
- SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
-
InvokerBeanMgr storageMapMgr = storageManager
.getInvokerBeanMgr();
RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
@@ -201,185 +203,161 @@
log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
}
- protected void internalRun() {
- if (log.isDebugEnabled())
- log.debug("Enter: InOrderInvoker::internalRun");
+ protected boolean internalRun() {
+ if (log.isDebugEnabled()) log.debug("Enter: Invoker::internalRun");
- // If this invoker is working for several sequences, we use round-robin to
- // try and give them all a chance to invoke messages.
- int nextIndex = 0;
boolean sleep = false;
- boolean processedMessage = false;
+ Transaction transaction = null;
+
+ try {
+ RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
- while (isThreadStarted()) {
+ InvokerBeanMgr storageMapMgr = storageManager
+ .getInvokerBeanMgr();
- try {
- if(sleep && !runMainLoop()) Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
- if (!isThreadStarted())
- continue;
- // Indicate that we are running the main loop
- setRanMainLoop();
- } catch (InterruptedException ex) {
- log.debug("Invoker was Interrupted.", ex);
- } finally {
- sleep = false;
+ transaction = storageManager.getTransaction();
+
+ // Pick a sequence using a round-robin approach
+ ArrayList allSequencesList = getSequences();
+ int size = allSequencesList.size();
+ log.debug("Choosing one from " + size + " sequences");
+ if(nextIndex >= size) {
+ nextIndex = 0;
+
+ // We just looped over the set of sequences. If we didn't process any
+ // messages on this loop then we sleep before the next one
+ if(size == 0 || !processedMessage) {
+ sleep = true;
+ }
+ processedMessage = false;
+
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, looped over all sequences, sleep " + sleep);
+ return sleep;
}
- //pause if we have to
- doPauseIfNeeded();
+ SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
+ String sequenceId = entry.getSequenceId();
+ log.debug("Chose sequence " + sequenceId);
+
+ RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
+ if (nextMsgBean == null) {
+ log.debug("Next message not set correctly. Removing invalid entry.");
+
+ stopThreadForSequence(sequenceId, entry.isRmSource());
+ allSequencesList = getSequences();
+ if (allSequencesList.size() == 0)
+ sleep = true;
- Transaction transaction = null;
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, sleep " + sleep);
+ return sleep;
+ }
- try {
- StorageManager storageManager = SandeshaUtil
- .getSandeshaStorageManager(context, context
- .getAxisConfiguration());
- RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
-
- InvokerBeanMgr storageMapMgr = storageManager
- .getInvokerBeanMgr();
-
- transaction = storageManager.getTransaction();
-
- // Pick a sequence using a round-robin approach
- ArrayList allSequencesList = getSequences();
- int size = allSequencesList.size();
- log.debug("Choosing one from " + size + " sequences");
- if(nextIndex >= size) {
- nextIndex = 0;
-
- // We just looped over the set of sequences. If we didn't process any
- // messages on this loop then we sleep before the next one
- if(size == 0 || !processedMessage) {
- sleep = true;
- }
- processedMessage = false;
- continue;
- }
- String sequenceId = (String) allSequencesList.get(nextIndex++);
- log.debug("Chose sequence " + sequenceId);
+ long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
+ if (nextMsgno <= 0) {
+ // Make sure we sleep on the next loop, so that we don't spin in a tight loop
+ sleep = true;
+ if (log.isDebugEnabled())
+ log.debug("Invalid Next Message Number " + nextMsgno);
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.invalidMsgNumber, Long
+ .toString(nextMsgno));
+ throw new SandeshaException(message);
+ }
- RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
- if (nextMsgBean == null) {
- String message = "Next message not set correctly. Removing invalid entry.";
- log.debug(message);
-
- stopThreadForSequence(sequenceId);
- allSequencesList = getSequences();
- if (allSequencesList.size() == 0)
- sleep = true;
- continue;
- }
+ InvokerBean selector = new InvokerBean();
+ selector.setSequenceID(sequenceId);
+ selector.setMsgNo(nextMsgno);
+ List invokerBeans = storageMapMgr.find(selector);
+
+ //add any msgs that belong to out of order windows
+ addOutOfOrderInvokerBeansToList(sequenceId,
+ storageManager, invokerBeans);
+
+ // If there aren't any beans to process then move on to the next sequence
+ if (invokerBeans.size() == 0) {
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep " + sleep);
+ return sleep;
+ }
+
+ Iterator stMapIt = invokerBeans.iterator();
- long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
- if (nextMsgno <= 0) {
- // Make sure we sleep on the next loop, so that we don't spin in a tight loop
+ //TODO correct the locking mechanism to have one lock per sequence.
+ //TODO should this be a while, not an if?
+ if (stMapIt.hasNext()) { //some invokation work is present
+
+ InvokerBean bean = (InvokerBean) stMapIt.next();
+ //see if this is an out of order msg
+ boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
+
+ String workId = sequenceId + "::" + bean.getMsgNo();
+ //creating a workId to uniquely identify the
+ //piece of work that will be assigned to the Worker.
+
+ //check whether the bean is already assigned to a worker.
+ if (getWorkerLock().isWorkPresent(workId)) {
+ // As there is already a worker assigned we are probably dispatching
+ // messages too quickly, so we sleep before trying the next sequence.
sleep = true;
- if (log.isDebugEnabled())
- log.debug("Invalid Next Message Number " + nextMsgno);
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.invalidMsgNumber, Long
- .toString(nextMsgno));
- throw new SandeshaException(message);
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
+ if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+ return sleep;
}
- InvokerBean selector = new InvokerBean();
- selector.setSequenceID(sequenceId);
- selector.setMsgNo(nextMsgno);
- List invokerBeans = storageMapMgr.find(selector);
-
- //add any msgs that belong to out of order windows
- addOutOfOrderInvokerBeansToList(sequenceId,
- storageManager, invokerBeans);
-
- // If there aren't any beans to process then move on to the next sequence
- if (invokerBeans.size() == 0) {
- if (log.isDebugEnabled())
- log.debug("No beans to invoke on sequence " + sequenceId);
- continue;
- }
+ String messageContextKey = bean.getMessageContextRefKey();
- Iterator stMapIt = invokerBeans.iterator();
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
- //TODO correct the locking mechanism to have one lock per sequence.
+ // start a new worker thread and let it do the invocation.
+ InvokerWorker worker = new InvokerWorker(context,
+ messageContextKey,
+ beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
+ //out of order message
- if (stMapIt.hasNext()) { //some invokation work is present
- if (!isThreadStarted())
- continue;
-
- InvokerBean bean = (InvokerBean) stMapIt.next();
- //see if this is an out of order msg
- boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
-
- String workId = sequenceId + "::" + bean.getMsgNo();
- //creating a workId to uniquely identify the
- //piece of work that will be assigned to the Worker.
-
- //check whether the bean is already assigned to a worker.
- if (getWorkerLock().isWorkPresent(workId)) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
- log.debug(message);
-
- // As there is already a worker assigned we are probably dispatching
- // messages too quickly, so we sleep before trying the next sequence.
- sleep = true;
- continue;
- }
-
- String messageContextKey = bean.getMessageContextRefKey();
-
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- }
-
- // start a new worker thread and let it do the invocation.
- InvokerWorker worker = new InvokerWorker(context,
- messageContextKey,
- beanIsOutOfOrderMsg); //only ignore nextMsgNumber if the bean is an
- //out of order message
-
- worker.setLock(getWorkerLock());
- worker.setWorkId(workId);
-
- threadPool.execute(worker);
-
- //adding the workId to the lock after assigning it to a thread makes sure
- //that all the workIds in the Lock are handled by threads.
- getWorkerLock().addWork(workId);
-
- processedMessage = true;
- }
- } catch (Exception e) {
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.rollbackError, e1
- .toString());
- log.debug(message, e1);
- }
+ worker.setLock(getWorkerLock());
+ worker.setWorkId(workId);
+
+ threadPool.execute(worker);
+
+ //adding the workId to the lock after assigning it to a thread makes sure
+ //that all the workIds in the Lock are handled by threads.
+ getWorkerLock().addWork(workId);
+
+ processedMessage = true;
+ }
+ } catch (Exception e) {
+ if (transaction != null) {
+ try {
+ transaction.rollback();
+ transaction = null;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.rollbackError, e1
+ .toString());
+ log.debug(message, e1);
}
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.invokeMsgError);
- log.debug(message, e);
- } finally {
- if (transaction != null) {
- try {
- transaction.commit();
- transaction = null;
- } catch (Exception e) {
- String message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.commitError, e.toString());
- log.debug(message, e);
- }
+ }
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.invokeMsgError);
+ log.debug(message, e);
+ } finally {
+ if (transaction != null) {
+ try {
+ transaction.commit();
+ transaction = null;
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
}
}
}
+
if (log.isDebugEnabled())
log.debug("Exit: InOrderInvoker::internalRun");
+ return sleep;
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Tue Feb 6 04:57:01 2007
@@ -22,8 +22,11 @@
import org.apache.axis2.util.threadpool.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+import org.apache.sandesha2.storage.StorageManager;
+import org.apache.sandesha2.util.SandeshaUtil;
/**
* Aggregates pause and stop logic between sender and invoker threads.
@@ -44,6 +47,7 @@
protected transient ThreadFactory threadPool;
protected ConfigurationContext context = null;
+ protected StorageManager storageManager = null;
private boolean reRunThread;
public SandeshaThread(int sleepTime) {
@@ -55,7 +59,7 @@
return lock;
}
- public synchronized void stopThreadForSequence(String sequenceID){
+ public synchronized void stopThreadForSequence(String sequenceID, boolean rmSource){
if (log.isDebugEnabled())
log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
@@ -63,7 +67,7 @@
// to sleep when there is no work to do. If we were to exit the thread then
// we wouldn't be able to start back up when the thread gets some more work
// to do.
- workingSequences.remove(sequenceID);
+ workingSequences.remove(new SequenceEntry(sequenceID, rmSource));
if (log.isDebugEnabled())
log.debug("Exit: SandeshaThread::stopThreadForSequence");
@@ -138,15 +142,17 @@
}
- public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID){
+ public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID, boolean rmSource){
if(log.isDebugEnabled()) log.debug("Entry: SandeshaThread::runThreadForSequence, " + this);
- if (!workingSequences.contains(sequenceID)) workingSequences.add(sequenceID);
+ SequenceEntry entry = new SequenceEntry(sequenceID, rmSource);
+ if (!workingSequences.contains(entry)) workingSequences.add(entry);
if (!isThreadStarted()) {
if(log.isDebugEnabled()) log.debug("Starting thread");
this.context = context;
+
// Get the axis2 thread pool
threadPool = context.getThreadPool();
@@ -164,6 +170,10 @@
if(log.isDebugEnabled()) log.debug("Exit: SandeshaThread::runThreadForSequence");
}
+ /**
+ *
+ * @return a List of SequenceEntry instances
+ */
public synchronized ArrayList getSequences() {
return workingSequences;
}
@@ -223,13 +233,54 @@
}
/**
- * The main work loop, to be implemented by any child class.
+ * The main work loop, to be implemented by any child class. If the child wants
+ * to sleep before the next loop then they should return true.
*/
- protected abstract void internalRun();
+ protected abstract boolean internalRun();
public void run() {
try {
- internalRun();
+ boolean sleep = false;
+
+ while (isThreadStarted()) {
+ try {
+ synchronized (this) {
+ if(sleep && !runMainLoop()) wait(sleepTime);
+ // Indicate that we are running the main loop
+ setRanMainLoop();
+ }
+ } catch (InterruptedException e1) {
+ log.debug("Sender was interupted...");
+ log.debug(e1.getMessage());
+ log.debug("End printing Interrupt...");
+ }
+
+ //pause if we have to
+ doPauseIfNeeded();
+
+ // Ensure we have context and a storage manager
+ if (context == null) {
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.configContextNotSet);
+ message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.cannotCointinueSender, message);
+ log.debug(message);
+ throw new RuntimeException(message);
+ }
+
+ if(storageManager == null) {
+ try {
+ storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
+ } catch (SandeshaException e2) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString());
+ log.debug(message);
+ throw new RuntimeException(message);
+ }
+ }
+
+ // Call into the real function
+ sleep = internalRun();
+ }
} finally {
// flag that we have exited the run loop and notify any waiting
// threads
@@ -238,6 +289,44 @@
hasStoppedRunning = true;
notify();
}
+ }
+ }
+
+ protected class SequenceEntry {
+ private String sequenceId;
+ private boolean rmSource;
+
+ public SequenceEntry(String sequenceId, boolean rmSource) {
+ this.sequenceId = sequenceId;
+ this.rmSource = rmSource;
+ }
+ public boolean isRmSource() {
+ return rmSource;
+ }
+ public String getSequenceId() {
+ return sequenceId;
+ }
+
+
+ public boolean equals(Object o) {
+ if(o == null) return false;
+ if(o == this) return true;
+ if(o.getClass() != getClass()) return false;
+
+ SequenceEntry other = (SequenceEntry) o;
+ if(sequenceId != null) {
+ if(!sequenceId.equals(other.sequenceId)) return false;
+ } else {
+ if(other.sequenceId != null) return false;
+ }
+
+ return rmSource == other.rmSource;
+ }
+ public int hashCode() {
+ int result = 1;
+ if(sequenceId != null) result = sequenceId.hashCode();
+ if(rmSource) result = -result;
+ return result;
}
}
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=504115&r1=504114&r2=504115
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Tue Feb 6 04:57:01 2007
@@ -20,14 +20,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
-import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
-import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
-import org.apache.sandesha2.util.SandeshaUtil;
/**
* This is responsible for sending and re-sending messages of Sandesha2. This
@@ -39,152 +36,104 @@
private static final Log log = LogFactory.getLog(Sender.class);
- public Sender () {
- super(Sandesha2Constants.SENDER_SLEEP_TIME);
- }
-
- protected void internalRun() {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::internalRun");
-
- if (context == null) {
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.configContextNotSet);
- message = SandeshaMessageHelper.getMessage(
- SandeshaMessageKeys.cannotCointinueSender, message);
- log.debug(message);
- throw new RuntimeException(message);
- }
+ public Sender () {
+ super(Sandesha2Constants.SENDER_SLEEP_TIME);
+ }
- StorageManager storageManager = null;
- boolean sleep = false;
+ protected boolean internalRun() {
+ if (log.isDebugEnabled()) log.debug("Enter: Sender::internalRun");
- try {
- storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
- } catch (SandeshaException e2) {
- // TODO Auto-generated catch block
- log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotCointinueSender, e2.toString()), e2);
- e2.printStackTrace();
- return;
- }
+ Transaction transaction = null;
- while (isThreadStarted()) {
+ try {
+ transaction = storageManager.getTransaction();
- try {
- synchronized (this) {
- if(sleep && !runMainLoop()) wait(Sandesha2Constants.SENDER_SLEEP_TIME);
- // Indicate that we are running the main loop
- setRanMainLoop();
+ SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
+ SenderBean senderBean = mgr.getNextMsgToSend();
+
+ if (senderBean == null) {
+ // As there was no work to do, we sleep for a while on the next loop.
+ if (log.isDebugEnabled()) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.senderBeanNotFound);
+ log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
}
- } catch (InterruptedException e1) {
- log.debug("Sender was interupted...");
- log.debug(e1.getMessage());
- log.debug("End printing Interrupt...");
- } finally {
- sleep = false;
+ return true;
}
- //pause if we have to
- doPauseIfNeeded();
-
- Transaction transaction = null;
-
- try {
-
- transaction = storageManager.getTransaction();
-
- SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
- SenderBean senderBean = mgr.getNextMsgToSend();
-
- if (senderBean == null) {
- if (log.isDebugEnabled()) {
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.senderBeanNotFound);
- log.debug(message);
- }
-
- // As there was no work to do, we sleep for a while on the next loop.
- sleep = true;
- continue;
+ // work Id is used to define the piece of work that will be
+ // assigned to the Worker thread,
+ // to handle this Sender bean.
+
+ //workId contains a timeTiSend part to cater for retransmissions.
+ //This will cause retransmissions to be treated as new work.
+ String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
+
+ // check weather the bean is already assigned to a worker.
+ if (getWorkerLock().isWorkPresent(workId)) {
+ // As there is already a worker running we are probably looping
+ // too fast, so sleep on the next loop.
+ if (log.isDebugEnabled()) {
+ String message = SandeshaMessageHelper.getMessage(
+ SandeshaMessageKeys.workAlreadyAssigned,
+ workId);
+ log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
}
+ return true;
+ }
- // work Id is used to define the piece of work that will be
- // assigned to the Worker thread,
- // to handle this Sender bean.
-
- //workId contains a timeTiSend part to cater for retransmissions.
- //This will cause retransmissions to be treated as new work.
- String workId = senderBean.getMessageID() + senderBean.getTimeToSend();
-
- // check weather the bean is already assigned to a worker.
- if (getWorkerLock().isWorkPresent(workId)) {
- if (log.isDebugEnabled()) {
- String message = SandeshaMessageHelper
- .getMessage(
- SandeshaMessageKeys.workAlreadyAssigned,
- workId);
- log.debug(message);
- }
- // As there is already a worker running we are probably looping
- // too fast, so sleep on the next loop.
- sleep = true;
- continue;
- }
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
- if(transaction != null) {
- transaction.commit();
+ // start a worker which will work on this messages.
+ SenderWorker worker = new SenderWorker(context, senderBean);
+ worker.setLock(getWorkerLock());
+ worker.setWorkId(workId);
+ threadPool.execute(worker);
+
+ // adding the workId to the lock after assigning it to a thread
+ // makes sure
+ // that all the workIds in the Lock are handled by threads.
+ getWorkerLock().addWork(workId);
+
+ } catch (Exception e) {
+
+ // TODO : when this is the client side throw the exception to
+ // the client when necessary.
+
+
+ //TODO rollback only if a SandeshaStorageException.
+ //This allows the other Exceptions to be used within the Normal flow.
+
+ if (transaction != null) {
+ try {
+ transaction.rollback();
transaction = null;
+ } catch (Exception e1) {
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
+ .toString());
+ log.debug(message, e1);
}
+ }
- // start a worker which will work on this messages.
- SenderWorker worker = new SenderWorker(context, senderBean);
- worker.setLock(getWorkerLock());
- worker.setWorkId(workId);
- threadPool.execute(worker);
-
- // adding the workId to the lock after assigning it to a thread
- // makes sure
- // that all the workIds in the Lock are handled by threads.
- getWorkerLock().addWork(workId);
-
- } catch (Exception e) {
-
- // TODO : when this is the client side throw the exception to
- // the client when necessary.
-
-
- //TODO rollback only if a SandeshaStorageException.
- //This allows the other Exceptions to be used within the Normal flow.
-
- if (transaction != null) {
- try {
- transaction.rollback();
- transaction = null;
- } catch (Exception e1) {
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
- .toString());
- log.debug(message, e1);
- }
- }
-
- String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
+ String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
- log.debug(message, e);
- } finally {
- if (transaction != null) {
- try {
- transaction.commit();
- transaction = null;
- } catch (Exception e) {
- String message = SandeshaMessageHelper
- .getMessage(SandeshaMessageKeys.commitError, e.toString());
- log.debug(message, e);
- }
+ log.debug(message, e);
+ } finally {
+ if (transaction != null) {
+ try {
+ transaction.commit();
+ transaction = null;
+ } catch (Exception e) {
+ String message = SandeshaMessageHelper
+ .getMessage(SandeshaMessageKeys.commitError, e.toString());
+ log.debug(message, e);
}
}
}
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::internalRun");
+ if (log.isDebugEnabled()) log.debug("Exit: Sender::internalRun, not sleeping");
+ return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org