You are viewing a plain text version of this content. The canonical link for it is here.
Posted to fx-dev@ws.apache.org by ga...@apache.org on 2006/12/07 17:04:04 UTC
svn commit: r483508 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2: ./ client/
storage/ storage/inmemory/ util/ workers/
Author: gatfora
Date: Thu Dec 7 08:04:03 2006
New Revision: 483508
URL: http://svn.apache.org/viewvc?view=rev&rev=483508
Log:
Another update to common up the processing for Sender and Invoker see SANDESHA2-61
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/Sandesha2Constants.java Thu Dec 7 08:04:03 2006
@@ -538,10 +538,6 @@
String DEFAULT_STORAGE_MANAGER = INMEMORY_STORAGE_MANAGER;
- String SENDER = "Sender";
-
- String INVOKER = "Invoker";
-
String POLLING_MANAGER = "PollingManager";
String WITHIN_TRANSACTION = "WithinTransaction";
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/client/SandeshaClient.java Thu Dec 7 08:04:03 2006
@@ -821,7 +821,7 @@
//only do this if we are running inOrder
if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
- Invoker invoker = (Invoker) configContext.getProperty(Sandesha2Constants.INVOKER);
+ Invoker invoker = (Invoker)SandeshaUtil.getSandeshaStorageManager(configContext, configContext.getAxisConfiguration()).getInvoker();
if (invoker==null){
throw new SandeshaException(SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.invokerNotFound, sequenceID));
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/StorageManager.java Thu Dec 7 08:04:03 2006
@@ -25,6 +25,7 @@
import org.apache.sandesha2.storage.beanmanagers.NextMsgBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
+import org.apache.sandesha2.workers.SandeshaThread;
/**
* Storage managers should extend this.
@@ -51,6 +52,10 @@
public abstract void initStorage (AxisModule moduleDesc) throws SandeshaStorageException;
public abstract Transaction getTransaction();
+
+ public abstract SandeshaThread getSender();
+
+ public abstract SandeshaThread getInvoker();
public abstract CreateSeqBeanMgr getCreateSeqBeanMgr();
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java Thu Dec 7 08:04:03 2006
@@ -38,6 +38,9 @@
import org.apache.sandesha2.storage.beanmanagers.SequencePropertyBeanMgr;
import org.apache.sandesha2.storage.beans.RMBean;
import org.apache.sandesha2.util.SandeshaUtil;
+import org.apache.sandesha2.workers.Invoker;
+import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.Sender;
public class InMemoryStorageManager extends StorageManager {
@@ -50,6 +53,8 @@
private SequencePropertyBeanMgr sequencePropertyBeanMgr = null;
private SenderBeanMgr senderBeanMgr = null;
private InvokerBeanMgr invokerBeanMgr = null;
+ private Sender sender = null;
+ private Invoker invoker = null;
private HashMap transactions = new HashMap();
public InMemoryStorageManager(ConfigurationContext context) {
@@ -60,6 +65,8 @@
this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this, context);
this.sequencePropertyBeanMgr = new InMemorySequencePropertyBeanMgr (this, context);
+ this.sender = new Sender();
+ this.invoker = new Invoker();
}
public Transaction getTransaction() {
@@ -91,6 +98,20 @@
}
}
+ /**
+ * Gets the Invoker for this Storage manager
+ */
+ public SandeshaThread getInvoker() {
+ return invoker;
+ }
+
+ /**
+ * Gets the Sender for this Storage manager
+ */
+ public SandeshaThread getSender() {
+ return sender;
+ }
+
void enlistBean(RMBean bean) throws SandeshaStorageException {
InMemoryTransaction t = null;
synchronized (transactions) {
@@ -201,7 +222,9 @@
public void storeSOAPEnvelope(SOAPEnvelope envelope, String key) throws SandeshaStorageException {
// TODO no real value
}
+
}
+
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SandeshaUtil.java Thu Dec 7 08:04:03 2006
@@ -78,8 +78,7 @@
import org.apache.sandesha2.storage.beans.CreateSeqBean;
import org.apache.sandesha2.storage.beans.SequencePropertyBean;
import org.apache.sandesha2.transport.Sandesha2TransportOutDesc;
-import org.apache.sandesha2.workers.Invoker;
-import org.apache.sandesha2.workers.Sender;
+import org.apache.sandesha2.workers.SandeshaThread;
import org.apache.sandesha2.wsrm.AckRequested;
import org.apache.sandesha2.wsrm.AcknowledgementRange;
import org.apache.sandesha2.wsrm.CloseSequence;
@@ -213,44 +212,38 @@
return sortedList;
}
- public static void startSenderForTheSequence(ConfigurationContext context, String sequenceID) {
+ public static void startSenderForTheSequence(ConfigurationContext context, String sequenceID) throws SandeshaException {
if (log.isDebugEnabled())
log.debug("Enter: SandeshaUtil::startSenderForTheSequence , context " + context + ", sequenceID " + sequenceID);
- Sender sender = (Sender) context.getProperty(Sandesha2Constants.SENDER);
-
- if (sender!=null)
- sender.runSenderForTheSequence(context, sequenceID);
- else {
- sender = new Sender ();
- context.setProperty(Sandesha2Constants.SENDER,sender);
- sender.runSenderForTheSequence(context, sequenceID);
- }
+ SandeshaThread sender = getSandeshaStorageManager(context, context.getAxisConfiguration()).getSender();
+ sender.runThreadForSequence(context, sequenceID);
if (log.isDebugEnabled())
log.debug("Exit: SandeshaUtil::startSenderForTheSequence");
}
- public static void stopSender(ConfigurationContext context) {
- Sender sender = (Sender) context.getProperty(Sandesha2Constants.SENDER);
-
- if (sender!=null) {
- sender.stopSending ();
- }
+ public static void stopSender(ConfigurationContext context) throws SandeshaException {
+ SandeshaThread sender = getSandeshaStorageManager(context, context.getAxisConfiguration()).getSender();
+ sender.stopRunning();
}
- public static void startInvokerForTheSequence(ConfigurationContext context, String sequenceID) {
+ public static void startInvokerForTheSequence(ConfigurationContext context, String sequenceID) throws SandeshaException {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaUtil::startInvokerForTheSequence , context " + context + ", sequenceID " + sequenceID);
- Invoker invoker = (Invoker) context.getProperty(Sandesha2Constants.INVOKER);
- if (invoker!=null)
- invoker.runInvokerForTheSequence(context,sequenceID);
- else {
- invoker = new Invoker ();
- context.setProperty(Sandesha2Constants.INVOKER,invoker);
- invoker.runInvokerForTheSequence(context,sequenceID);
- }
+ SandeshaThread invoker = getSandeshaStorageManager(context, context.getAxisConfiguration()).getInvoker();
+ invoker.runThreadForSequence(context,sequenceID);
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaUtil::startInvokerForTheSequence");
}
-
+
+ public static void stopInvoker(ConfigurationContext context) throws SandeshaException {
+ SandeshaThread invoker = getSandeshaStorageManager(context, context.getAxisConfiguration()).getInvoker();
+ invoker.stopRunning();
+ }
+
public static void startPollingManager (ConfigurationContext configurationContext) throws SandeshaException {
PollingManager pollingManager = (PollingManager) configurationContext.getProperty(
Sandesha2Constants.POLLING_MANAGER);
@@ -270,12 +263,6 @@
pollingManager.stopPolling ();
}
- public static void stopInvoker(ConfigurationContext context) {
- Invoker invoker = (Invoker) context.getProperty(Sandesha2Constants.INVOKER);
- if (invoker!=null)
- invoker.stopInvoking();
- }
-
public static String getMessageTypeString(int messageType) {
switch (messageType) {
case Sandesha2Constants.MessageTypes.CREATE_SEQ:
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Thu Dec 7 08:04:03 2006
@@ -58,17 +58,6 @@
super(INVOKER_THREADPOOL_SIZE, Sandesha2Constants.INVOKER_SLEEP_TIME);
lock = new WorkerLock ();
}
-
- public synchronized void stopInvokerForTheSequence(String sequenceID) {
- if (log.isDebugEnabled())
- log.debug("Enter: InOrderInvoker::stopInvokerForTheSequence, "
- + sequenceID);
-
- super.stopThreadForSequence(sequenceID);
-
- if (log.isDebugEnabled())
- log.debug("Exit: InOrderInvoker::stopInvokerForTheSequence");
- }
/**
* Forces dispatch of queued messages to the application.
@@ -187,37 +176,6 @@
}
}
- public synchronized void stopInvoking() {
- if (log.isDebugEnabled())
- log.debug("Enter: InOrderInvoker::stopInvoking");
-
- super.stopRunning();
-
- if (log.isDebugEnabled())
- log.debug("Exit: InOrderInvoker::stopInvoking");
- }
-
- public synchronized boolean isInvokerStarted() {
- boolean isThreadStarted = super.isThreadStarted();
- if(!isThreadStarted){
- //to avoid too much noise we should only trace if the invoker is not started
- if (log.isDebugEnabled())
- log.debug("invoker not started");
- }
- return isThreadStarted;
- }
-
- public synchronized void runInvokerForTheSequence(
- ConfigurationContext context, String sequenceID) {
- if (log.isDebugEnabled())
- log.debug("Enter: InOrderInvoker::runInvokerForTheSequence");
-
- super.runThreadForSequence(context, sequenceID);
-
- if (log.isDebugEnabled())
- log.debug("Exit: InOrderInvoker::runInvokerForTheSequence");
- }
-
private void addOutOfOrderInvokerBeansToList(String sequenceID,
StorageManager strMgr, List list)throws SandeshaException{
if (log.isDebugEnabled())
@@ -262,7 +220,7 @@
// try and give them all a chance to invoke messages.
int nextIndex = 0;
- while (isInvokerStarted()) {
+ while (isThreadStarted()) {
try {
Thread.sleep(Sandesha2Constants.INVOKER_SLEEP_TIME);
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Thu Dec 7 08:04:03 2006
@@ -21,6 +21,8 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.util.threadpool.ThreadFactory;
import org.apache.axis2.util.threadpool.ThreadPool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
@@ -29,6 +31,8 @@
*/
public abstract class SandeshaThread extends Thread{
+ private static final Log log = LogFactory.getLog(SandeshaThread.class);
+
private boolean runThread = false;
private boolean hasStoppedRunning = false;
private boolean hasPausedRunning = false;
@@ -46,11 +50,16 @@
this.sleepTime = sleepTime;
}
- protected void stopThreadForSequence(String sequenceID){
+ public synchronized void stopThreadForSequence(String sequenceID){
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaThread::stopThreadForSequence, " + sequenceID);
+
workingSequences.remove(sequenceID);
- if (workingSequences.size() == 0) {
- runThread = false;
- }
+ if (workingSequences.size() == 0)
+ runThread = false;
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopThreadForSequence");
}
/**
@@ -91,6 +100,9 @@
}
public synchronized void stopRunning() {
+ if (log.isDebugEnabled())
+ log.debug("Enter: SandeshaThread::stopRunning");
+
//NOTE: we do not take acount of pausing when stopping.
//The call to stop will wait until the invoker has exited the loop
if (isThreadStarted()) {
@@ -105,10 +117,16 @@
}
}
}
-
+
+ if (log.isDebugEnabled())
+ log.debug("Exit: SandeshaThread::stopRunning");
}
public synchronized boolean isThreadStarted() {
+
+ if (!runThread && log.isDebugEnabled())
+ log.debug("SandeshaThread not started");
+
return runThread;
}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=483508&r1=483507&r2=483508
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Thu Dec 7 08:04:03 2006
@@ -18,7 +18,6 @@
package org.apache.sandesha2.workers;
import org.apache.axis2.addressing.AddressingConstants;
-import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
@@ -47,42 +46,10 @@
private WorkerLock lock = null;
- public Sender () {
- super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
- lock = new WorkerLock ();
- }
-
- public synchronized void stopSenderForTheSequence(String sequenceID) {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::stopSenderForTheSequence, " + sequenceID);
-
- super.stopThreadForSequence(sequenceID);
-
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::stopSenderForTheSequence");
- }
-
-
- public synchronized void stopSending() {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::stopSending");
-
- super.stopRunning();
-
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::stopSending");
- }
-
- public synchronized boolean isSenderStarted() {
- boolean isThreadStarted = super.isThreadStarted();
- if(!isThreadStarted){
- //to avoid too much noise we should only trace if the sender is not started
- if (log.isDebugEnabled())
- log.debug("sender not started");
- }
- return isThreadStarted;
- }
-
+ public Sender () {
+ super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
+ lock = new WorkerLock ();
+ }
protected void internalRun() {
if (log.isDebugEnabled())
@@ -99,7 +66,7 @@
return;
}
- while (isSenderStarted()) {
+ while (isThreadStarted()) {
try {
Thread.sleep(Sandesha2Constants.SENDER_SLEEP_TIME);
@@ -232,16 +199,6 @@
}
if (log.isDebugEnabled())
log.debug("Exit: Sender::internalRun");
- }
-
- public synchronized void runSenderForTheSequence(ConfigurationContext context, String sequenceID) {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::runSenderForTheSequence, " + sequenceID);
-
- runThreadForSequence(context, sequenceID);
-
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::runSenderForTheSequence");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org