You are viewing a plain text version of this content. The canonical link for it is here.
Posted to sandesha-dev@ws.apache.org by ga...@apache.org on 2006/12/06 12:50:12 UTC
svn commit: r483043 -
/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
Author: gatfora
Date: Wed Dec 6 03:50:07 2006
New Revision: 483043
URL: http://svn.apache.org/viewvc?view=rev&rev=483043
Log:
Adding the SandeshaThread see SANDESHA2-61
Added:
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
Added: webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java
URL: http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java?view=auto&rev=483043
==============================================================================
--- webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java (added)
+++ webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/SandeshaThread.java Wed Dec 6 03:50:07 2006
@@ -0,0 +1,167 @@
+/*
+ * Copyright 1999-2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.sandesha2.workers;
+
+import java.util.ArrayList;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.util.threadpool.ThreadFactory;
+import org.apache.axis2.util.threadpool.ThreadPool;
+import org.apache.sandesha2.i18n.SandeshaMessageHelper;
+import org.apache.sandesha2.i18n.SandeshaMessageKeys;
+
+/**
+ * Aggregates pause and stop logic between sender and invoker threads.
+ */
+public abstract class SandeshaThread extends Thread{
+
+ private boolean runThread = false;
+ private boolean hasStoppedRunning = false;
+ private boolean hasPausedRunning = false;
+ private boolean pauseRequired = false;
+
+ private int sleepTime;
+
+ private ArrayList workingSequences = new ArrayList();
+
+ protected transient ThreadFactory threadPool;
+ protected ConfigurationContext context = null;
+
+ protected SandeshaThread(int threadPoolSize, int sleepTime){
+ threadPool = new ThreadPool(threadPoolSize, threadPoolSize);
+ this.sleepTime = sleepTime;
+ }
+
+ protected void stopThreadForSequence(String sequenceID){
+ workingSequences.remove(sequenceID);
+ if (workingSequences.size() == 0) {
+ runThread = false;
+ }
+ }
+
+ /**
+ * Waits for the invoking thread to pause
+ */
+ public synchronized void blockForPause(){
+ while(pauseRequired){
+ //someone else is requesting a pause - wait for them to finish
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }
+
+ //we can now request a pause - the next pause will be ours
+ pauseRequired = true;
+
+ if(hasStoppedRunning() || !isThreadStarted()){
+ throw new IllegalStateException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotPauseThread));
+ }
+ while(!hasPausedRunning){
+ //wait for our pause to come around
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+
+ }
+ //the sandesha thread is now paused
+ }
+
+ public synchronized void finishPause(){
+ //indicate that the current pause is no longer required.
+ pauseRequired = false;
+ notifyAll();
+ }
+
+ public synchronized void stopRunning() {
+ //NOTE: we do not take acount of pausing when stopping.
+ //The call to stop will wait until the invoker has exited the loop
+ if (isThreadStarted()) {
+ // the invoker is started so stop it
+ runThread = false;
+ // wait for it to finish
+ while (!hasStoppedRunning()) {
+ try {
+ wait(sleepTime);
+ } catch (InterruptedException e1) {
+ //ignore
+ }
+ }
+ }
+
+ }
+
+ public synchronized boolean isThreadStarted() {
+ return runThread;
+ }
+
+
+ public synchronized void runThreadForSequence(ConfigurationContext context, String sequenceID){
+ if (!workingSequences.contains(sequenceID))
+ workingSequences.add(sequenceID);
+ if (!isThreadStarted()) {
+ this.context = context;
+ runThread = true; // so that isStarted()=true.
+ super.start();
+ }
+ }
+
+ protected synchronized boolean hasStoppedRunning() {
+ return hasStoppedRunning;
+ }
+
+ protected synchronized void doPauseIfNeeded(){
+ //see if we need to pause
+
+ while(pauseRequired){
+ if(!hasPausedRunning){
+ //let the requester of this pause know we are now pausing
+ hasPausedRunning = true;
+ notifyAll();
+ }
+ //now we pause
+ try{
+ wait(sleepTime);
+ }catch(InterruptedException e){
+ //ignore
+ }
+ }//end while
+ //the request to pause has finished so we are no longer pausing
+ hasPausedRunning = false;
+ }
+
+ /**
+ * The main work loop, to be implemented by any child class.
+ */
+ protected abstract void internalRun();
+
+ public void run() {
+ try {
+ internalRun();
+ } finally {
+ // flag that we have exited the run loop and notify any waiting
+ // threads
+ synchronized (this) {
+ hasStoppedRunning = true;
+ notify();
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: sandesha-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: sandesha-dev-help@ws.apache.org