You are viewing a plain text version of this content. The canonical link for it is here.
Posted to batik-dev@xmlgraphics.apache.org by de...@apache.org on 2001/08/06 21:38:55 UTC
cvs commit: xml-batik/test-sources/org/apache/batik/util RunnableQueueTest.java
deweese 01/08/06 12:38:55
Modified: sources/org/apache/batik/util RunnableQueue.java
test-sources/org/apache/batik/util RunnableQueueTest.java
Log:
1) Cleaned up notion of 'suspendExecution'.
Required addition of transitory 'Suspending' state.
2) Augmented the Test code to test random suspension and resumption by
two threads. :)
3) Split synchronization mutexes to allow for adding Runnables while
runHandler methods are running (wasn't comfortable assuming these
were 'quick').
Revision Changes Path
1.4 +160 -45 xml-batik/sources/org/apache/batik/util/RunnableQueue.java
Index: RunnableQueue.java
===================================================================
RCS file: /home/cvs/xml-batik/sources/org/apache/batik/util/RunnableQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- RunnableQueue.java 2001/08/06 15:35:13 1.3
+++ RunnableQueue.java 2001/08/06 19:38:55 1.4
@@ -16,17 +16,57 @@
* invocation in a single thread.
*
* @author <a href="mailto:stephane@hillion.org">Stephane Hillion</a>
- * @version $Id: RunnableQueue.java,v 1.3 2001/08/06 15:35:13 deweese Exp $
+ * @version $Id: RunnableQueue.java,v 1.4 2001/08/06 19:38:55 deweese Exp $
*/
public class RunnableQueue implements Runnable {
/**
- * whether this thread is suspended.
+ * Type-safe enumeration of queue states.
*/
- protected boolean suspended;
+ public static class RunnableQueueState extends Object {
+ final String value;
+ private RunnableQueueState(String value) {
+ this.value = value.intern(); }
+ public String getValue() { return value; }
+ public String toString() {
+ return "[RunnableQueueState: " + value + "]"; }
+ }
+
+ /**
+ * The queue is in the processes of running tasks.
+ */
+ public static final RunnableQueueState RUNNING
+ = new RunnableQueueState("Running");
+
+ /**
+ * The queue may still be running tasks but as soon as possible
+ * will go to SUSPENDED state.
+ */
+ public static final RunnableQueueState SUSPENDING
+ = new RunnableQueueState("Suspending");
+
+ /**
+ * The queue is no longer running any tasks and will not
+ * run any tasks until resumeExecution is called.
+ */
+ public static final RunnableQueueState SUSPENDED
+ = new RunnableQueueState("Suspended");
+
+ /**
+ * The Suspension state of this thread.
+ */
+ protected RunnableQueueState state;
+
+ /**
+ * Object to synchronize/wait/notify for suspension
+ * issues.
+ */
+ protected Object stateLock = new Object();
+
/**
- * The Runnable objects list.
+ * The Runnable objects list, also used as synchoronization point
+ * for pushing/poping runables.
*/
protected DoublyLinkedList list = new DoublyLinkedList();
@@ -65,39 +105,50 @@
public void run() {
synchronized (this) {
runnableQueueThread = Thread.currentThread();
+ // Wake the create method so it knows we are in
+ // our run and ready to go.
notify();
}
+
Link l;
Runnable rable;
try {
while (!Thread.currentThread().isInterrupted()) {
- synchronized (this) {
- if (suspended) {
- if (runHandler != null) {
- runHandler.executionSuspended(this);
+
+ // Mutex for suspention work.
+ synchronized (stateLock) {
+ if (state != RUNNING) {
+ state = SUSPENDED;
+
+ // notify suspendExecution in case it is
+ // waiting til we shut down.
+ stateLock.notifyAll();
+
+ executionSuspended();
+
+ while (state != RUNNING) {
+ state = SUSPENDED;
+ // Wait until resumeExecution called.
+ stateLock.wait();
}
- while (suspended) {
- wait();
- }
- if (runHandler != null) {
- runHandler.executionResumed(this);
- }
+
+ executionResumed();
}
+ }
+ synchronized (list) {
l = (Link)list.pop();
if (l == null) {
- wait();
+ // No item to run, wait till there is one.
+ list.wait();
continue; // start loop over again...
}
+
rable = l.runnable;
}
rable.run();
l.unlock();
- synchronized (this) {
- if (runHandler != null) {
- runHandler.runnableInvoked(this, rable);
- }
- }
+ runnableInvoked(rable);
}
} catch (InterruptedException e) {
} finally {
@@ -121,13 +172,15 @@
* An exception is thrown if the RunnableQueue was not started.
* @throws IllegalStateException if getThread() is null.
*/
- public synchronized void invokeLater(Runnable r) {
+ public void invokeLater(Runnable r) {
if (runnableQueueThread == null) {
throw new IllegalStateException
("RunnableQueue not started or has exited");
}
- list.push(new Link(r));
- notify();
+ synchronized (list) {
+ list.push(new Link(r));
+ list.notify();
+ }
}
/**
@@ -149,41 +202,71 @@
}
LockableLink l = new LockableLink(r);
- synchronized (this) {
+ synchronized (list) {
list.push(l);
- notify();
+ list.notify();
}
l.lock();
}
- public synchronized boolean isSuspended() { return suspended; }
+ public RunnableQueueState getQueueState() {
+ synchronized (stateLock) {
+ return state;
+ }
+ }
/**
- * Suspends the execution of this queue.
- * @throws IllegalStateException if getThread() is null.
- */
- public synchronized void suspendExecution() {
+ * Suspends the execution of this queue after the current runnable
+ * completes.
+ * @param waitTillSuspended if true this method will not return
+ * until the queue has suspended (no runnable in progress
+ * or about to be in progress). If resumeExecution is
+ * called while waiting will simply return (this really
+ * indicates a race condition in your code). This may
+ * return before an associated RunHandler is notified.
+ * @throws IllegalStateException if getThread() is null. */
+ public void suspendExecution(boolean waitTillSuspended) {
if (runnableQueueThread == null) {
throw new IllegalStateException
("RunnableQueue not started or has exited");
}
-
- suspended = true;
+ synchronized (stateLock) {
+ if (state == SUSPENDED)
+ // already suspended...
+ return;
+
+ if (state == RUNNING) {
+ state = SUSPENDING;
+ synchronized (list) {
+ // Wake up run thread if it is waiting for jobs,
+ // so we go into the suspended case (notifying
+ // run-handler etc...)
+ list.notify();
+ }
+ }
+
+ if (waitTillSuspended)
+ try {
+ stateLock.wait();
+ } catch(InterruptedException ie) { }
+ }
}
/**
* Resumes the execution of this queue.
* @throws IllegalStateException if getThread() is null.
*/
- public synchronized void resumeExecution() {
+ public void resumeExecution() {
if (runnableQueueThread == null) {
throw new IllegalStateException
("RunnableQueue not started or has exited");
}
- if (suspended) {
- suspended = false;
- notify();
+ synchronized (stateLock) {
+ if (state != RUNNING) {
+ state = RUNNING;
+ stateLock.notifyAll(); // wake it up.
+ }
}
}
@@ -194,21 +277,22 @@
* to suspend (with <tt>suspendExecution()</tt>) the queue.
* @throws IllegalStateException if getThread() is null.
*/
- public synchronized List getRunnableList() {
+ public List getRunnableList() {
if (runnableQueueThread == null) {
throw new IllegalStateException
("RunnableQueue not started or has exited");
}
List result = new LinkedList();
- Link l, h;
- l = h = (Link)list.getHead();
- if (h==null) return result;
- do {
- result.add(l.runnable);
- l = (Link)l.getNext();
- } while (l != h);
-
+ synchronized (list) {
+ Link l, h;
+ l = h = (Link)list.getHead();
+ if (h==null) return result;
+ do {
+ result.add(l.runnable);
+ l = (Link)l.getNext();
+ } while (l != h);
+ }
return result;
}
@@ -224,6 +308,37 @@
*/
public synchronized RunHandler getRunHandler() {
return runHandler;
+ }
+
+ /**
+ * Called when execution is being suspended.
+ * Currently just notifies runHandler
+ */
+ protected synchronized void executionSuspended() {
+ if (runHandler != null) {
+ runHandler.executionSuspended(this);
+ }
+ }
+
+ /**
+ * Called when execution is being resumed.
+ * Currently just notifies runHandler
+ */
+ protected synchronized void executionResumed() {
+ if (runHandler != null) {
+ runHandler.executionResumed(this);
+ }
+ }
+
+ /**
+ * Called when a Runnable completes.
+ * Currently just notifies runHandler
+ * @param rable The runnable that just completed.
+ */
+ protected synchronized void runnableInvoked(Runnable rable ) {
+ if (runHandler != null) {
+ runHandler.runnableInvoked(this, rable);
+ }
}
/**
1.2 +42 -11 xml-batik/test-sources/org/apache/batik/util/RunnableQueueTest.java
Index: RunnableQueueTest.java
===================================================================
RCS file: /home/cvs/xml-batik/test-sources/org/apache/batik/util/RunnableQueueTest.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- RunnableQueueTest.java 2001/08/06 15:35:13 1.1
+++ RunnableQueueTest.java 2001/08/06 19:38:55 1.2
@@ -20,6 +20,8 @@
public int nThreads;
public int activeThreads;
+ public Random rand;
+ public RunnableQueue rq;
/**
* Constructor
@@ -43,32 +45,61 @@
* of the test's internal operation fails.
*/
public TestReport runImpl() throws Exception {
- RunnableQueue rq = RunnableQueue.createRunnableQueue();
+ rq = RunnableQueue.createRunnableQueue();
List l = new ArrayList(nThreads);
- Random rand = new Random(2345);
+ rand = new Random(2345);
+
+ // Two switch flickers to make things interesting...
+ l.add(new SwitchFlicker());
+ l.add(new SwitchFlicker());
+
for (int i=0; i<nThreads; i++) {
- Runnable rqRable = new RQRable(i, rand.nextInt(50));
+ Runnable rqRable = new RQRable(i, rand.nextInt(50)+1);
l.add(new TPRable(rq, i, rand.nextBoolean(),
- rand.nextInt(1000), 20, rqRable));
+ rand.nextInt(500)+1, 20, rqRable));
}
+
synchronized (this) {
ThreadPounder tp = new ThreadPounder(l);
tp.start();
activeThreads = nThreads;
while (activeThreads != 0) {
- rq.suspendExecution();
- System.out.println("Suspended");
- wait(rand.nextInt(100));
- if (activeThreads == 0) break;
- System.out.println("Resuming");
- rq.resumeExecution();
- wait(rand.nextInt(500));
+ wait();
}
}
System.exit(0);
return null;
+ }
+
+ public class SwitchFlicker implements Runnable {
+ public void run() {
+ boolean suspendp, waitp;
+ int time;
+ while (true) {
+ try {
+ synchronized (rand) {
+ suspendp = rand.nextBoolean();
+ waitp = rand.nextBoolean();
+ time = rand.nextInt(500);
+ }
+ if (suspendp) {
+ // 1/2 of the time suspend, 1/2 time wait, 1/2 the
+ // time don't
+ rq.suspendExecution(waitp);
+ System.out.println("Suspended - " +
+ (waitp?"Wait":"Later"));
+ Thread.sleep(time/10);
+ } else {
+ // 1/2 the time resume
+ rq.resumeExecution();
+ System.out.println("Resumed");
+ Thread.sleep(time);
+ }
+ } catch(InterruptedException ie) { }
+ }
+ }
}
public class TPRable implements Runnable {
---------------------------------------------------------------------
To unsubscribe, e-mail: batik-dev-unsubscribe@xml.apache.org
For additional commands, e-mail: batik-dev-help@xml.apache.org