You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2015/03/09 18:30:47 UTC
svn commit: r1665310 -
/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
Author: schor
Date: Mon Mar 9 17:30:47 2015
New Revision: 1665310
URL: http://svn.apache.org/r1665310
Log:
[UIMA-4280] improve the start of the multi-threads to occur more contemporaneously
Modified:
uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java?rev=1665310&r1=1665309&r2=1665310&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java Mon Mar 9 17:30:47 2015
@@ -18,6 +18,8 @@
*/
package org.apache.uima.internal.util;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.TestCase;
/**
@@ -29,6 +31,8 @@ import junit.framework.TestCase;
*
*/
public class MultiThreadUtils extends TestCase {
+
+ public final static boolean debug = false;
public final static int PROCESSORS = Runtime.getRuntime().availableProcessors();
@@ -42,75 +46,179 @@ public class MultiThreadUtils extends Te
// this class extends TestCase (in order to have access to assertTrue, etc
// this causes the junit runner to warn if there are no "test"s in this class
public void testDummy() {}
+
+ // also serves as a lock
+
+ private static enum ThreadControl {
+ WAIT, // causes test thread to wait, is the initial state
+ RUN, // causes test thread to run; when run is done, thread goes back to waiting and sets global entry in thread array to WAIT
+ TERMINATE, // causes test thread to finish
+ }
+
+ private static final AtomicInteger numberRunning = new AtomicInteger(0);
+
+ private static final AtomicInteger numberOfExceptions = new AtomicInteger(0);
+ /**
+ * On a 2 GHz i7 running Windows, it seems to take about 1 millisecond to create and start up a thread.
+ *
+ * To get maximum likelyhood of threads all starting together, the threads are all started, but then they wait
+ * for a "go" signal. Each thread has a "threadControl" value, which is accessed under a specific lock for the thread
+ * to insure memory synchronization; the threads have the states above.
+ *
+ * To reduce the overhead, the logic is:
+ * a) make the threads and start them. They go to their wait point.
+ * b) for the repeat loop:
+ * b1) release all threads from wait point
+ * b2) wait for all threads to reach their wait point again (at the end of their processing)
+ *
+ * b3) repeat b1 and b2 for the repeat count.
+ *
+ * c) signal all threads to terminate.
+ *
+ * d) do the join wait for everything to finish.
+ *
+ * @param name root name for messages and thread ids
+ * @param numberOfThreads number of threads
+ * @param repeats number of times to repeat the whole test
+ * @param run2isb the Callable to run in multiple threads, called with thread # and a string builder for messages
+ * @param beforeRepeatArg a Runnable or null, to run before each outer "repeat".
+ * @throws Exception
+ */
public static void tstMultiThread(
- final String name,
+ final String name, // name root for messages and thread ids
int numberOfThreads,
int repeats,
- final Run2isb run2isb,
- final Runnable beforeRepeat) throws Exception {
- Thread[] threads = new Thread[numberOfThreads];
+ final Run2isb run2isb, // the Callable that is run in a thread, passed in also are the thread # and a string builder for messages
+ final Runnable beforeRepeatArg // called before every repeat, use null or MultiThreadUtils.emptyReset if not wanted.
+ ) throws Exception {
+
+ final Runnable beforeRepeat = (null == beforeRepeatArg) ? emptyReset : beforeRepeatArg;
+ final Thread[] threads = new Thread[numberOfThreads];
final Throwable[] thrown = new Throwable[1];
+ final ThreadControl[][] threadState = new ThreadControl[numberOfThreads][1];
+
thrown[0] = null;
- for (int r = 0; r < repeats; r++) {
- beforeRepeat.run();
- final int finalR = r;
- try {
- for (int i = 0; i < numberOfThreads; i++) {
- final int finalI = i;
- threads[i] = new Thread(new Runnable() {
+ final int[] repeatNumber = {0};
+
+ long startTime = System.nanoTime();
+ for (int i = 0; i < numberOfThreads; i++) {
+ final int finalI = i;
+ threadState[i][0] = ThreadControl.WAIT;
+
+ // We make the runnable inside this loop to capture the thread number
+ Runnable runnable = new Runnable() {
+ public void run() {
+ // sb is for debugging; it's passed into the runnable which can choose to print it or not
+ StringBuilder sb = new StringBuilder(80);
+
+ while (true) {
+ synchronized (threadState[finalI]) {
+ while (threadState[finalI][0] == ThreadControl.WAIT) {
+ try {
+ threadState[finalI].wait();
+ } catch (InterruptedException e) {
+ }
+ if (threadState[finalI][0] == ThreadControl.TERMINATE) {
+ return;
+ }
+ }
+ }
+
+ synchronized(threadState[finalI]) {
+ assertEquals(ThreadControl.RUN, threadState[finalI][0]);
+ }
- public void run() {
- // sb is for debugging; it's passed into the runnable which can choose to print it or not
- StringBuilder sb = new StringBuilder(80);
- try {
- sb.append(name).append(", thread ").append(finalI).append(' ');
- run2isb.call(finalI, finalR, sb);
- } catch (Throwable e) {
- System.err.format("%s: Runnable threw exception %s%n", name, e.getMessage());
- e.printStackTrace(System.err);
- thrown[0] = e;
- throw new RuntimeException(e); // silly, just causes thread to end
+ try {
+ assertTrue(numberRunning.get() > 0);
+ sb.append(name).append(", thread ").append(finalI).append(' ');
+// System.out.println(sb.toString());
+ run2isb.call(finalI, repeatNumber[0], sb);
+ } catch (Throwable e) {
+ System.err.format("%s: Runnable threw exception %s%n", name, e.getMessage());
+ e.printStackTrace(System.err);
+ numberOfExceptions.incrementAndGet();
+ synchronized (numberOfExceptions) {
+ numberOfExceptions.notify();
}
- }} );
- threads[i].setName(name + " Thread " + i);
- threads[i].setPriority(Thread.NORM_PRIORITY - 1);
- threads[i].start();
- }
-
- for (int i = 0; i < numberOfThreads; i++) {
- try {
- if (thrown[0] != null) {
- assertTrue(false);
+ thrown[0] = e;
+// synchronized (threadState[finalI]) {
+// threadState[finalI][0] = ThreadControl.EXCEPTION;
+// }
}
- threads[i].join();
- if (thrown[0] != null) {
- thrown[0].printStackTrace();
- assertTrue(false);
+ synchronized(threadState[finalI]) {
+ threadState[finalI][0] = ThreadControl.WAIT;
+ }
+ numberRunning.decrementAndGet();
+ synchronized (numberRunning) {
+ numberRunning.notify();
}
- } catch (InterruptedException e) {
- e.printStackTrace();
- assertTrue(false);
- }
- }
- } finally {
- // cleanup
- // interrupt live threads
- // wait for all threads to terminate
- for (Thread thread : threads) {
- if (thread.isAlive()) {
- thread.interrupt();
}
+ }};
+ threads[i] = new Thread(runnable);
+ threads[i].setName(name + " Thread " + i);
+ threads[i].setPriority(Thread.NORM_PRIORITY - 1);
+ threads[i].start();
+ }
+ if (debug) {
+ System.out.format("Time to create %d threads: %,d microsec%n", numberOfThreads, (System.nanoTime() - startTime) / 1000);
+ }
+
+ for (int r = 0; r < repeats; r++) {
+ beforeRepeat.run();
+
+ repeatNumber[0] = r;
+ assertTrue(numberRunning.get() == 0);
+ assertTrue(numberOfExceptions.get() == 0);
+
+ startTime = System.nanoTime();
+
+ // release all threads from wait point
+ for (int i = 0; i < numberOfThreads; i++) {
+ synchronized (threadState[i]) {
+ assertEquals(ThreadControl.WAIT, threadState[i][0]);
+ threadState[i][0] = ThreadControl.RUN;
+ numberRunning.incrementAndGet();
+ threadState[i].notify();
}
- for (Thread thread : threads) {
- if (thread.isAlive()) {
- thread.join();
- }
+ }
+ if (debug) {
+ System.out.format("repeat %,d Time to release %d threads from wait: %,d microsec%n", r, numberOfThreads, (System.nanoTime() - startTime) / 1000);
+ }
+
+ // wait for all threads to return to wait state
+
+ synchronized (numberRunning) {
+ while (numberRunning.get() > 0) {
+ numberRunning.wait();
+ }
+ }
+ for (int i = 0; i < numberOfThreads; i++) {
+ synchronized (threadState[i]) {
+ assertEquals(ThreadControl.WAIT, threadState[i][0]);
}
}
- }
+ } // end of repeat loop
+
+ for (int i = 0; i < numberOfThreads; i++) {
+ synchronized (threadState[i]) {
+ threadState[i][0] = ThreadControl.TERMINATE;
+ threadState[i].notify();
+ }
+ }
+
+ // wait for all threads to terminate
+ for (Thread thread : threads) {
+ if (thread.isAlive()) {
+ thread.interrupt();
+ }
+ }
+ for (Thread thread : threads) {
+ if (thread.isAlive()) {
+ thread.join();
+ }
+ }
}
-
}