You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2006/12/06 12:50:12 UTC

svn commit: r483043 - /webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java

Author: gatfora
Date: Wed Dec  6 03:50:07 2006
New Revision: 483043

URL: http://svn.apache.org/viewvc?view=rev&rev=483043
Log:
Adding the SandeshaThread see SANDESHA2-61

Added:
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java

Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=auto&rev=483043
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Wed Dec  6 03:50:07 2006
@@ -0,0 +1,167 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *  
+ */
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.axis2.util.threadpool.ThreadPool;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Aggregates pause and stop logic between sender and invoker threads.
+ */
+public abstract class SandeshaThread extends Thread{
+
+	private boolean runThread = false;
+	private boolean hasStoppedRunning = false;
+	private boolean hasPausedRunning = false;
+	private boolean pauseRequired = false;
+	
+	private int sleepTime;
+	
+	private ArrayList workingSequences = new ArrayList();
+	
+	protected transient ThreadFactory threadPool;
+	protected ConfigurationContext context = null;
+	
+	protected SandeshaThread(int threadPoolSize, int sleepTime){
+		threadPool = new ThreadPool(threadPoolSize, threadPoolSize);
+		this.sleepTime = sleepTime;
+	}
+	
+	protected void stopThreadForSequence(String sequenceID){
+		workingSequences.remove(sequenceID);
+		if (workingSequences.size() == 0) {
+			runThread = false;
+		}
+	}
+	
+	/**
+	 * Waits for the invoking thread to pause
+	 */
+	public synchronized void blockForPause(){
+		while(pauseRequired){
+			//someone else is requesting a pause - wait for them to finish
+			try{
+				wait(sleepTime);
+			}catch(InterruptedException e){
+				//ignore
+			}
+		}
+		
+	  //we can now request a pause - the next pause will be ours
+	  pauseRequired = true;
+				
+		if(hasStoppedRunning() || !isThreadStarted()){
+			throw new IllegalStateException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotPauseThread));
+		}
+		while(!hasPausedRunning){
+			//wait for our pause to come around
+			try{
+				wait(sleepTime);
+			}catch(InterruptedException e){
+				//ignore
+			}
+			
+		}
+		//the sandesha thread is now paused
+	}
+	
+	public synchronized void finishPause(){
+		//indicate that the current pause is no longer required.
+		pauseRequired = false;
+		notifyAll();
+	}
+	
+	public synchronized void stopRunning() {
+		//NOTE: we do not take acount of pausing when stopping.
+		//The call to stop will wait until the invoker has exited the loop
+		if (isThreadStarted()) {
+			// the invoker is started so stop it
+			runThread = false;
+			// wait for it to finish
+			while (!hasStoppedRunning()) {
+				try {
+					wait(sleepTime);
+				} catch (InterruptedException e1) {
+					//ignore
+				}
+			}
+		}
+
+	}
+	
+	public synchronized boolean isThreadStarted() {
+		return runThread;
+	}
+	
+
+	public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID){
+		if (!workingSequences.contains(sequenceID))
+			workingSequences.add(sequenceID);
+		if (!isThreadStarted()) {
+			this.context = context;
+			runThread = true; // so that isStarted()=true.
+			super.start();
+		}		
+	}
+
+	protected synchronized boolean hasStoppedRunning() {
+		return hasStoppedRunning;
+	}
+	
+	protected synchronized void doPauseIfNeeded(){
+		//see if we need to pause
+			
+			while(pauseRequired){
+				if(!hasPausedRunning){
+					//let the requester of this pause know we are now pausing
+				  hasPausedRunning = true;
+				  notifyAll();						
+				}
+				//now we pause
+			  try{
+			  	wait(sleepTime);
+			  }catch(InterruptedException e){
+			  	//ignore
+			  }
+			}//end while
+			//the request to pause has finished so we are no longer pausing
+			hasPausedRunning = false;
+	}
+
+	/**
+	 * The main work loop, to be implemented by any child class.
+	 */
+	protected abstract void internalRun();
+	
+	public void run() {
+		try {
+			internalRun();
+		} finally {
+			// flag that we have exited the run loop and notify any waiting
+			// threads
+			synchronized (this) {
+				hasStoppedRunning = true;
+				notify();
+			}
+		}
+	}
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org