You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ml...@apache.org on 2006/12/06 12:00:57 UTC

svn commit: r483026 - in /webservices/sandesha/trunk/java/src/org/apache/sandesha2: i18n/SandeshaMessageKeys.java i18n/resource.properties workers/Invoker.java workers/Sender.java

Author: mlovett
Date: Wed Dec  6 03:00:57 2006
New Revision: 483026

URL: http://svn.apache.org/viewvc?view=rev&rev=483026
Log:
Apply Tom's patch to refactor sender and invoker threads, see SANDESHA2-61

Modified:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java Wed Dec  6 03:00:57 2006
@@ -12,6 +12,7 @@
 	public static final String cannotInitSecurityManager="cannotInitSecurityManager";
 	public static final String securityManagerMustImplement="securityManagerMustImplement";
 	public static final String cannotFindModulePolicies="cannotFindModulePolicies";
+	public static final String cannotPauseThread = "cannotPauseThread";
 
 	public static final String commitError="commitError";
 	public static final String rollbackError="rollbackError";

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties Wed Dec  6 03:00:57 2006
@@ -29,6 +29,7 @@
 cannotInitSecurityManager=Cannot initialize the given security manager due to exception {0}.
 securityManagerMustImplement=SecurityManager {0} must implement the org.apache.sandesha2.storage.StorageManager interface.
 cannotFindModulePolicies=No policies were found in the module.xml at the module initiation time
+cannotPauseThread=Cannot pause a non-running thread.
 
 commitError=Exception thrown when trying to commit the transaction: {0}
 rollbackError=Exception thrown when trying to rollback the transaction: {0}

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java Wed Dec  6 03:00:57 2006
@@ -22,8 +22,6 @@
 import java.util.List;
 
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.axis2.util.threadpool.ThreadPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -48,25 +46,16 @@
  * to find weather there are any messages to me invoked.
  */
 
-public class Invoker extends Thread {
+public class Invoker extends SandeshaThread {
 
-	private boolean runInvoker = false;
-	private ArrayList workingSequences = new ArrayList();
-	private ConfigurationContext context = null;
 	private static final Log log = LogFactory.getLog(Invoker.class);
 	
-	private boolean hasStoppedInvoking = false;
-	private boolean hasPausedInvoking = false;
-	private boolean pauseRequired = false;
-	
-	private transient ThreadFactory threadPool;
-	public int INVOKER_THREADPOOL_SIZE = 5;
+	public static final int INVOKER_THREADPOOL_SIZE = 5;
 
 	private WorkerLock lock = null;
 	
 	public Invoker() {
-		threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
-				INVOKER_THREADPOOL_SIZE);
+		super(INVOKER_THREADPOOL_SIZE, Sandesha2Constants.INVOKER_SLEEP_TIME);
 		lock = new WorkerLock ();
 	}
 
@@ -75,53 +64,12 @@
 			log.debug("Enter: InOrderInvoker::stopInvokerForTheSequence, "
 					+ sequenceID);
 
-		workingSequences.remove(sequenceID);
-		if (workingSequences.size() == 0) {
-			runInvoker = false;
-		}
+		super.stopThreadForSequence(sequenceID);
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: InOrderInvoker::stopInvokerForTheSequence");
 	}
 	
-	
-	/**
-	 * Waits for the invoking thread to pause
-	 */
-	public synchronized void blockForPause(){
-		while(pauseRequired){
-			//someone else is requesting a pause - wait for them to finish
-			try{
-				wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-			}catch(InterruptedException e){
-				//ignore
-			}
-		}
-		
-	  //we can now request a pause - the next pause will be ours
-	  pauseRequired = true;
-				
-		if(hasStoppedInvoking() || !isInvokerStarted()){
-			throw new IllegalStateException("Cannot pause a non-running invoker thread"); //TODO NLS
-		}
-		while(!hasPausedInvoking){
-			//wait for our pause to come around
-			try{
-				wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-			}catch(InterruptedException e){
-				//ignore
-			}
-			
-		}
-		//the invoker thread is now paused
-	}
-	
-	private synchronized void finishPause(){
-		//indicate that the current pause is no longer required.
-		pauseRequired = false;
-		notifyAll();
-	}
-	
 	/**
 	 * Forces dispatch of queued messages to the application.
 	 * NOTE: may break ordering
@@ -242,31 +190,21 @@
 	public synchronized void stopInvoking() {
 		if (log.isDebugEnabled())
 			log.debug("Enter: InOrderInvoker::stopInvoking");
-		//NOTE: we do not take acount of pausing when stopping.
-		//The call to stop will wait until the invoker has exited the loop
-		if (isInvokerStarted()) {
-			// the invoker is started so stop it
-			runInvoker = false;
-			// wait for it to finish
-			while (!hasStoppedInvoking()) {
-				try {
-					wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-				} catch (InterruptedException e1) {
-					log.debug(e1.getMessage());
-				}
-			}
-		}
+
+		super.stopRunning();
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: InOrderInvoker::stopInvoking");
 	}
 
 	public synchronized boolean isInvokerStarted() {
-		if (log.isDebugEnabled()) {
-			log.debug("Enter: InOrderInvoker::isInvokerStarted");
-			log.debug("Exit: InOrderInvoker::isInvokerStarted, " + runInvoker);
+		boolean isThreadStarted = super.isThreadStarted();
+		if(!isThreadStarted){
+			//to avoid too much noise we should only trace if the invoker is not started
+			if (log.isDebugEnabled())
+				log.debug("invoker not started");	
 		}
-		return runInvoker;
+		return isThreadStarted;
 	}
 
 	public synchronized void runInvokerForTheSequence(
@@ -274,47 +212,12 @@
 		if (log.isDebugEnabled())
 			log.debug("Enter: InOrderInvoker::runInvokerForTheSequence");
 
-		if (!workingSequences.contains(sequenceID))
-			workingSequences.add(sequenceID);
-
-		if (!isInvokerStarted()) {
-			this.context = context;
-			runInvoker = true; // so that isSenderStarted()=true.
-			super.start();
-		}
+		super.runThreadForSequence(context, sequenceID);
+		
 		if (log.isDebugEnabled())
 			log.debug("Exit: InOrderInvoker::runInvokerForTheSequence");
 	}
 
-	private synchronized boolean hasStoppedInvoking() {
-		if (log.isDebugEnabled()) {
-			log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
-			log
-					.debug("Exit: InOrderInvoker::hasStoppedInvoking, "
-							+ hasStoppedInvoking);
-		}
-		return hasStoppedInvoking;
-	}
-
-	public void run() {
-		if (log.isDebugEnabled())
-			log.debug("Enter: InOrderInvoker::run");
-
-		try {
-			internalRun();
-		} finally {
-			// flag that we have exited the run loop and notify any waiting
-			// threads
-			synchronized (this) {
-				hasStoppedInvoking = true;
-				notify();
-			}
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: InOrderInvoker::run");
-	}
-
 	private void addOutOfOrderInvokerBeansToList(String sequenceID, 
 			StorageManager strMgr, List list)throws SandeshaException{
 		if (log.isDebugEnabled())
@@ -351,7 +254,7 @@
 			log.debug("Exit: InOrderInvoker::addOutOfOrderInvokerBeansToList");
 	}
 	
-	private void internalRun() {
+	protected void internalRun() {
 		if (log.isDebugEnabled())
 			log.debug("Enter: InOrderInvoker::internalRun");
 		
@@ -368,25 +271,8 @@
 				log.debug(ex.getMessage());
 			}
 					
-			//see if we need to pause
-			synchronized(this){
-				
-				while(pauseRequired){
-					if(!hasPausedInvoking){
-						//let the requester of this pause know we are now pausing
-					  hasPausedInvoking = true;
-					  notifyAll();						
-					}
-					//now we pause
-				  try{
-				  	wait(Sandesha2Constants.INVOKER_SLEEP_TIME);
-				  }catch(InterruptedException e){
-				  	//ignore
-				  }
-				}//end while
-				//the request to pause has finished so we are no longer pausing
-				hasPausedInvoking = false;
-			}
+			//pause if we have to
+			doPauseIfNeeded();
 
 			Transaction transaction = null;
 			boolean rolebacked = false;

Modified: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?view=diff&rev=483026&r1=483025&r2=483026
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java (original)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java Wed Dec  6 03:00:57 2006
@@ -17,12 +17,8 @@
 
 package org.apache.sandesha2.workers;
 
-import java.util.ArrayList;
-
 import org.apache.axis2.addressing.AddressingConstants;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.util.threadpool.ThreadFactory;
-import org.apache.axis2.util.threadpool.ThreadPool;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sandesha2.Sandesha2Constants;
@@ -41,92 +37,54 @@
  * Sender table to find out any entries that should be sent.
  */
 
-public class Sender extends Thread {
+public class Sender extends SandeshaThread {
+
 
-	private boolean runSender = false;
-	private ArrayList workingSequences = new ArrayList();
-	private ConfigurationContext context = null;
 	private static final Log log = LogFactory.getLog(Sender.class);
-	private boolean hasStopped = false;
 	
-    private transient ThreadFactory threadPool;
-    public int SENDER_THREADPOOL_SIZE = 5;
-    
-    private WorkerLock lock = null;
+
+  public static final int SENDER_THREADPOOL_SIZE = 5;
+  
+  private WorkerLock lock = null;
     
     public Sender () {
-    	threadPool = new ThreadPool (SENDER_THREADPOOL_SIZE,SENDER_THREADPOOL_SIZE);
+    	super(SENDER_THREADPOOL_SIZE, Sandesha2Constants.SENDER_SLEEP_TIME);
     	lock = new WorkerLock ();
     }
 
 	public synchronized void stopSenderForTheSequence(String sequenceID) {
 		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::stopSenderForTheSequence, " + sequenceID);
-		workingSequences.remove(sequenceID);
-		if (workingSequences.size() == 0) {
-			runSender = false;
-		}
+		
+		super.stopThreadForSequence(sequenceID);
+		
 		if (log.isDebugEnabled())
 			log.debug("Exit: Sender::stopSenderForTheSequence");
 	}
+	
 
 	public synchronized void stopSending() {
 		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::stopSending");
 
-		if (isSenderStarted()) {
-			// the sender is started so stop it
-			runSender = false;
-			// wait for it to finish
-			while (!hasStoppedSending()) {
-				try {
-					wait(Sandesha2Constants.SENDER_SLEEP_TIME);
-				} catch (InterruptedException e1) {
-					log.debug(e1.getMessage());
-				}
-			}
-		}
+		super.stopRunning();
 
 		if (log.isDebugEnabled())
 			log.debug("Exit: Sender::stopSending");
 	}
 
-	private synchronized boolean hasStoppedSending() {
-		if (log.isDebugEnabled()) {
-			log.debug("Enter: Sender::hasStoppedSending");
-			log.debug("Exit: Sender::hasStoppedSending, " + hasStopped);
-		}
-		return hasStopped;
-	}
-
 	public synchronized boolean isSenderStarted() {
-		if (log.isDebugEnabled()) {
-			log.debug("Enter: Sender::isSenderStarted");
-			log.debug("Exit: Sender::isSenderStarted, " + runSender);
+		boolean isThreadStarted = super.isThreadStarted();
+		if(!isThreadStarted){
+			//to avoid too much noise we should only trace if the sender is not started
+			if (log.isDebugEnabled())
+				log.debug("sender not started");	
 		}
-		return runSender;
+		return isThreadStarted;
 	}
 
-	public void run() {
-		if (log.isDebugEnabled())
-			log.debug("Enter: Sender::run");
-
-		try {
-			internalRun();
-		} finally {
-			// flag that we have exited the run loop and notify any waiting
-			// threads
-			synchronized (this) {
-				hasStopped = true;
-				notify();
-			}
-		}
-
-		if (log.isDebugEnabled())
-			log.debug("Exit: Sender::run");
-	}
 
-	private void internalRun() {
+	protected void internalRun() {
 		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::internalRun");
 
@@ -151,6 +109,9 @@
 				log.debug(e1.getMessage());
 				log.debug("End printing Interrupt...");
 			}
+			
+			//pause if we have to
+			doPauseIfNeeded();
 
 			Transaction transaction = null;
 			boolean rolebacked = false;
@@ -164,7 +125,7 @@
 					log.debug(message);
 					throw new SandeshaException(message);
 				}
-
+				
 				// TODO make sure this locks on reads.
 				transaction = storageManager.getTransaction();
 
@@ -277,14 +238,8 @@
 		if (log.isDebugEnabled())
 			log.debug("Enter: Sender::runSenderForTheSequence, " + sequenceID);
 
-		if (sequenceID != null && !workingSequences.contains(sequenceID))
-			workingSequences.add(sequenceID);
-
-		if (!isSenderStarted()) {
-			this.context = context;
-			runSender = true; // so that isSenderStarted()=true.
-			super.start();
-		}
+		runThreadForSequence(context, sequenceID);
+		
 		if (log.isDebugEnabled())
 			log.debug("Exit: Sender::runSenderForTheSequence");
 	}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org