You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2006/12/06 12:00:57 UTC
svn commit: r483026 - in
/webservices/sandesha/trunk/java/src/org/apache/sandesha2:
i18n/SandeshaMessageKeys.java i18n/resource.properties workers/Invoker.java
workers/Sender.java
Author: mlovett
Date: Wed Dec 6 03:00:57 2006
New Revision: 483026
URL: http://svn.apache.org/viewvc?view=rev&rev=483026
Log:
Apply Tom's patch to refactor sender and invoker threads, see SANDESHA2-61
Modified:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Wed Dec 6 03:00:57 2006
@@ -12,6 +12,7 @@
public static final String cannotInitSecurityManager="cannotInitSecurityManager";
public static final String securityManagerMustImplement="securityManagerMustImplement";
public static final String cannotFindModulePolicies="cannotFindModulePolicies";
+ public static final String cannotPauseThread = "cannotPauseThread";
public static final String commitError="commitError";
public static final String rollbackError="rollbackError";
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Wed Dec 6 03:00:57 2006
@@ -29,6 +29,7 @@
cannotInitSecurityManager=Cannot initialize the given security manager due to exception {0}.
securityManagerMustImplement=SecurityManager {0} must implement the org.apache.sandesha2.storage.StorageManager interface.
cannotFindModulePolicies=No policies were found in the module.xml at the module initiation time
+cannotPauseThread=Cannot pause a non-running thread.
commitError=Exception thrown when trying to commit the transaction: {0}
rollbackError=Exception thrown when trying to rollback the transaction: {0}
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Wed Dec 6 03:00:57 2006
@@ -22,8 +22,6 @@
import java.util.List;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.axis2.util.threadpool.ThreadPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
@@ -48,25 +46,16 @@
* to find weather there are any messages to me invoked.
*/
-public class Invoker extends Thread {
+public class Invoker extends SandeshaThread {
- private boolean runInvoker = false;
- private ArrayList workingSequences = new ArrayList();
- private ConfigurationContext context = null;
private static final Log log = LogFactory.getLog(Invoker.class);
- private boolean hasStoppedInvoking = false;
- private boolean hasPausedInvoking = false;
- private boolean pauseRequired = false;
-
- private transient ThreadFactory threadPool;
- public int INVOKER_THREADPOOL_SIZE = 5;
+ public static final int INVOKER_THREADPOOL_SIZE = 5;
private WorkerLock lock = null;
public Invoker() {
- threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
- INVOKER_THREADPOOL_SIZE);
+ super(INVOKER_THREADPOOL_SIZE, Sandesha2Constants.INVOKER_SLEEP_TIME);
lock = new WorkerLock ();
}
@@ -75,53 +64,12 @@
log.debug("Enter: InOrderInvoker::stopInvokerForTheSequence, "
+ sequenceID);
- workingSequences.remove(sequenceID);
- if (workingSequences.size() == 0) {
- runInvoker = false;
- }
+ super.stopThreadForSequence(sequenceID);
if (log.isDebugEnabled())
log.debug("Exit: InOrderInvoker::stopInvokerForTheSequence");
}
-
- /**
- * Waits for the invoking thread to pause
- */
- public synchronized void blockForPause(){
- while(pauseRequired){
- //someone else is requesting a pause - wait for them to finish
- try{
- wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
- }catch(InterruptedException e){
- //ignore
- }
- }
-
- //we can now request a pause - the next pause will be ours
- pauseRequired = true;
-
- if(hasStoppedInvoking() || !isInvokerStarted()){
- throw new IllegalStateException("Cannot pause a non-running invoker thread"); //TODO NLS
- }
- while(!hasPausedInvoking){
- //wait for our pause to come around
- try{
- wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
- }catch(InterruptedException e){
- //ignore
- }
-
- }
- //the invoker thread is now paused
- }
-
- private synchronized void finishPause(){
- //indicate that the current pause is no longer required.
- pauseRequired = false;
- notifyAll();
- }
-
/**
* Forces dispatch of queued messages to the application.
* NOTE: may break ordering
@@ -242,31 +190,21 @@
public synchronized void stopInvoking() {
if (log.isDebugEnabled())
log.debug("Enter: InOrderInvoker::stopInvoking");
- //NOTE: we do not take acount of pausing when stopping.
- //The call to stop will wait until the invoker has exited the loop
- if (isInvokerStarted()) {
- // the invoker is started so stop it
- runInvoker = false;
- // wait for it to finish
- while (!hasStoppedInvoking()) {
- try {
- wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
- } catch (InterruptedException e1) {
- log.debug(e1.getMessage());
- }
- }
- }
+
+ super.stopRunning();
if (log.isDebugEnabled())
log.debug("Exit: InOrderInvoker::stopInvoking");
}
public synchronized boolean isInvokerStarted() {
- if (log.isDebugEnabled()) {
- log.debug("Enter: InOrderInvoker::isInvokerStarted");
- log.debug("Exit: InOrderInvoker::isInvokerStarted, " + runInvoker);
+ boolean isThreadStarted = super.isThreadStarted();
+ if(!isThreadStarted){
+ //to avoid too much noise we should only trace if the invoker is not started
+ if (log.isDebugEnabled())
+ log.debug("invoker not started");
}
- return runInvoker;
+ return isThreadStarted;
}
public synchronized void runInvokerForTheSequence(
@@ -274,47 +212,12 @@
if (log.isDebugEnabled())
log.debug("Enter: InOrderInvoker::runInvokerForTheSequence");
- if (!workingSequences.contains(sequenceID))
- workingSequences.add(sequenceID);
-
- if (!isInvokerStarted()) {
- this.context = context;
- runInvoker = true; // so that isSenderStarted()=true.
- super.start();
- }
+ super.runThreadForSequence(context, sequenceID);
+
if (log.isDebugEnabled())
log.debug("Exit: InOrderInvoker::runInvokerForTheSequence");
}
- private synchronized boolean hasStoppedInvoking() {
- if (log.isDebugEnabled()) {
- log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
- log
- .debug("Exit: InOrderInvoker::hasStoppedInvoking, "
- + hasStoppedInvoking);
- }
- return hasStoppedInvoking;
- }
-
- public void run() {
- if (log.isDebugEnabled())
- log.debug("Enter: InOrderInvoker::run");
-
- try {
- internalRun();
- } finally {
- // flag that we have exited the run loop and notify any waiting
- // threads
- synchronized (this) {
- hasStoppedInvoking = true;
- notify();
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: InOrderInvoker::run");
- }
-
private void addOutOfOrderInvokerBeansToList(String sequenceID,
StorageManager strMgr, List list)throws SandeshaException{
if (log.isDebugEnabled())
@@ -351,7 +254,7 @@
log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
}
- private void internalRun() {
+ protected void internalRun() {
if (log.isDebugEnabled())
log.debug("Enter: InOrderInvoker::internalRun");
@@ -368,25 +271,8 @@
log.debug(ex.getMessage());
}
- //see if we need to pause
- synchronized(this){
-
- while(pauseRequired){
- if(!hasPausedInvoking){
- //let the requester of this pause know we are now pausing
- hasPausedInvoking = true;
- notifyAll();
- }
- //now we pause
- try{
- wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
- }catch(InterruptedException e){
- //ignore
- }
- }//end while
- //the request to pause has finished so we are no longer pausing
- hasPausedInvoking = false;
- }
+ //pause if we have to
+ doPauseIfNeeded();
Transaction transaction = null;
boolean rolebacked = false;
Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Wed Dec 6 03:00:57 2006
@@ -17,12 +17,8 @@
package org.apache.sandesha2.workers;
-import java.util.ArrayList;
-
import org.apache.axis2.addressing.AddressingConstants;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.axis2.util.threadpool.ThreadPool;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.Sandesha2Constants;
@@ -41,92 +37,54 @@
* Sender table to find out any entries that should be sent.
*/
-public class Sender extends Thread {
+public class Sender extends SandeshaThread {
+
- private boolean runSender = false;
- private ArrayList workingSequences = new ArrayList();
- private ConfigurationContext context = null;
private static final Log log = LogFactory.getLog(Sender.class);
- private boolean hasStopped = false;
- private transient ThreadFactory threadPool;
- public int SENDER_THREADPOOL_SIZE = 5;
-
- private WorkerLock lock = null;
+
+ public static final int SENDER_THREADPOOL_SIZE = 5;
+
+ private WorkerLock lock = null;
public Sender () {
- threadPool = new ThreadPool (SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+ super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
lock = new WorkerLock ();
}
public synchronized void stopSenderForTheSequence(String sequenceID) {
if (log.isDebugEnabled())
log.debug("Enter: Sender::stopSenderForTheSequence, " + sequenceID);
- workingSequences.remove(sequenceID);
- if (workingSequences.size() == 0) {
- runSender = false;
- }
+
+ super.stopThreadForSequence(sequenceID);
+
if (log.isDebugEnabled())
log.debug("Exit: Sender::stopSenderForTheSequence");
}
+
public synchronized void stopSending() {
if (log.isDebugEnabled())
log.debug("Enter: Sender::stopSending");
- if (isSenderStarted()) {
- // the sender is started so stop it
- runSender = false;
- // wait for it to finish
- while (!hasStoppedSending()) {
- try {
- wait(Sandesha2Constants.SENDER_SLEEP_TIME);
- } catch (InterruptedException e1) {
- log.debug(e1.getMessage());
- }
- }
- }
+ super.stopRunning();
if (log.isDebugEnabled())
log.debug("Exit: Sender::stopSending");
}
- private synchronized boolean hasStoppedSending() {
- if (log.isDebugEnabled()) {
- log.debug("Enter: Sender::hasStoppedSending");
- log.debug("Exit: Sender::hasStoppedSending, " + hasStopped);
- }
- return hasStopped;
- }
-
public synchronized boolean isSenderStarted() {
- if (log.isDebugEnabled()) {
- log.debug("Enter: Sender::isSenderStarted");
- log.debug("Exit: Sender::isSenderStarted, " + runSender);
+ boolean isThreadStarted = super.isThreadStarted();
+ if(!isThreadStarted){
+ //to avoid too much noise we should only trace if the sender is not started
+ if (log.isDebugEnabled())
+ log.debug("sender not started");
}
- return runSender;
+ return isThreadStarted;
}
- public void run() {
- if (log.isDebugEnabled())
- log.debug("Enter: Sender::run");
-
- try {
- internalRun();
- } finally {
- // flag that we have exited the run loop and notify any waiting
- // threads
- synchronized (this) {
- hasStopped = true;
- notify();
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Exit: Sender::run");
- }
- private void internalRun() {
+ protected void internalRun() {
if (log.isDebugEnabled())
log.debug("Enter: Sender::internalRun");
@@ -151,6 +109,9 @@
log.debug(e1.getMessage());
log.debug("End printing Interrupt...");
}
+
+ //pause if we have to
+ doPauseIfNeeded();
Transaction transaction = null;
boolean rolebacked = false;
@@ -164,7 +125,7 @@
log.debug(message);
throw new SandeshaException(message);
}
-
+
// TODO make sure this locks on reads.
transaction = storageManager.getTransaction();
@@ -277,14 +238,8 @@
if (log.isDebugEnabled())
log.debug("Enter: Sender::runSenderForTheSequence, " + sequenceID);
- if (sequenceID != null && !workingSequences.contains(sequenceID))
- workingSequences.add(sequenceID);
-
- if (!isSenderStarted()) {
- this.context = context;
- runSender = true; // so that isSenderStarted()=true.
- super.start();
- }
+ runThreadForSequence(context, sequenceID);
+
if (log.isDebugEnabled())
log.debug("Exit: Sender::runSenderForTheSequence");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org