You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by sc...@apache.org on 2015/03/09 18:30:47 UTC

svn commit: r1665310 - /uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java

Author: schor
Date: Mon Mar  9 17:30:47 2015
New Revision: 1665310

URL: http://svn.apache.org/r1665310
Log:
[UIMA-4280] improve the start of the multi-threads to occur more contemporaneously

Modified:
    uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java

Modified: uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java
URL: http://svn.apache.org/viewvc/uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java?rev=1665310&r1=1665309&r2=1665310&view=diff
==============================================================================
--- uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java (original)
+++ uima/uimaj/trunk/uimaj-core/src/test/java/org/apache/uima/internal/util/MultiThreadUtils.java Mon Mar  9 17:30:47 2015
@@ -18,6 +18,8 @@
  */
 package org.apache.uima.internal.util;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.TestCase;
 
 /**
@@ -29,6 +31,8 @@ import junit.framework.TestCase;
  *
  */
 public class MultiThreadUtils extends TestCase {
+  
+  public final static boolean debug = false;
 
   public final static int PROCESSORS = Runtime.getRuntime().availableProcessors();
   
@@ -42,75 +46,179 @@ public class MultiThreadUtils extends Te
   //   this class extends TestCase (in order to have access to assertTrue, etc
   //   this causes the junit runner to warn if there are no "test"s in this class
   public void testDummy() {}
+
+  // also serves as a lock
+  
+  private static enum ThreadControl {
+    WAIT,   // causes test thread to wait, is the initial state 
+    RUN,    // causes test thread to run; when run is done, thread goes back to waiting and sets global entry in thread array to WAIT
+    TERMINATE,  // causes test thread to finish 
+  }
+    
+  private static final AtomicInteger numberRunning = new AtomicInteger(0);
+  
+  private static final AtomicInteger numberOfExceptions = new AtomicInteger(0);
   
+  /**
+   * On a 2 GHz i7 running Windows, it seems to take about 1 millisecond to create and start up a thread.
+   * 
+   * To get maximum likelyhood of threads all starting together, the threads are all started, but then they wait
+   * for a "go" signal.  Each thread has a "threadControl" value, which is accessed under a specific lock for the thread
+   * to insure memory synchronization; the threads have the states above.
+   * 
+   * To reduce the overhead, the logic is:
+   *   a) make the threads and start them.  They go to their wait point.
+   *   b) for the repeat loop:
+   *       b1) release all threads from wait point
+   *       b2) wait for all threads to reach their wait point again (at the end of their processing) 
+   *       
+   *       b3) repeat b1 and b2 for the repeat count.
+   *       
+   *   c) signal all threads to terminate.
+   *   
+   *   d) do the join wait for everything to finish.
+   *   
+   * @param name root name for messages and thread ids
+   * @param numberOfThreads number of threads
+   * @param repeats number of times to repeat the whole test
+   * @param run2isb the Callable to run in multiple threads, called with thread # and a string builder for messages 
+   * @param beforeRepeatArg a Runnable or null, to run before each outer "repeat".
+   * @throws Exception
+   */
   public static void tstMultiThread(
-      final String name, 
+      final String name,  // name root for messages and thread ids
       int numberOfThreads, 
       int repeats, 
-      final Run2isb run2isb, 
-      final Runnable beforeRepeat) throws Exception {
-    Thread[] threads = new Thread[numberOfThreads];
+      final Run2isb run2isb,    // the Callable that is run in a thread, passed in also are the thread # and a string builder for messages
+      final Runnable beforeRepeatArg  // called before every repeat, use null or MultiThreadUtils.emptyReset if not wanted.
+      ) throws Exception {
+       
+    final Runnable beforeRepeat = (null == beforeRepeatArg) ? emptyReset : beforeRepeatArg;
+    final Thread[] threads = new Thread[numberOfThreads];
 
     final Throwable[] thrown = new Throwable[1];
+    final ThreadControl[][] threadState = new ThreadControl[numberOfThreads][1];
+    
     thrown[0] = null;
     
-    for (int r = 0; r < repeats; r++) {
-      beforeRepeat.run();
-      final int finalR = r;
-      try {
-        for (int i = 0; i < numberOfThreads; i++) {
-          final int finalI = i;
-          threads[i] = new Thread(new Runnable() {
+    final int[] repeatNumber = {0};
+    
+    long startTime = System.nanoTime();
+    for (int i = 0; i < numberOfThreads; i++) {
+      final int finalI = i;
+      threadState[i][0] = ThreadControl.WAIT;
+      
+      // We make the runnable inside this loop to capture the thread number
+      Runnable runnable = new Runnable() {         
+        public void run() {
+          // sb is for debugging; it's passed into the runnable which can choose to print it or not
+          StringBuilder sb = new StringBuilder(80);
+        
+          while (true) {
+            synchronized (threadState[finalI]) {
+              while (threadState[finalI][0] == ThreadControl.WAIT) {
+                try {
+                  threadState[finalI].wait();
+                } catch (InterruptedException e) {
+                }
+                if (threadState[finalI][0] == ThreadControl.TERMINATE) {
+                  return;
+                }
+              }
+            }
+            
+            synchronized(threadState[finalI]) {
+              assertEquals(ThreadControl.RUN, threadState[finalI][0]);
+            }
             
-            public void run() {
-              // sb is for debugging; it's passed into the runnable which can choose to print it or not
-              StringBuilder sb = new StringBuilder(80);
-              try {
-                sb.append(name).append(", thread ").append(finalI).append(' ');
-                run2isb.call(finalI, finalR, sb);
-              } catch (Throwable e) {
-                System.err.format("%s: Runnable threw exception %s%n", name, e.getMessage());
-                e.printStackTrace(System.err);
-                thrown[0] = e;
-                throw new RuntimeException(e); // silly, just causes thread to end
+            try {
+              assertTrue(numberRunning.get() > 0);
+              sb.append(name).append(", thread ").append(finalI).append(' ');
+//              System.out.println(sb.toString());
+              run2isb.call(finalI, repeatNumber[0], sb);
+            } catch (Throwable e) {
+              System.err.format("%s: Runnable threw exception %s%n", name, e.getMessage());
+              e.printStackTrace(System.err);
+              numberOfExceptions.incrementAndGet();
+              synchronized (numberOfExceptions) {
+                numberOfExceptions.notify();
               }
-            }} );
-          threads[i].setName(name + " Thread " + i);
-          threads[i].setPriority(Thread.NORM_PRIORITY - 1);
-          threads[i].start();
-        }
-      
-        for (int i = 0; i < numberOfThreads; i++) {
-          try {
-            if (thrown[0] != null) {
-              assertTrue(false);
+              thrown[0] = e;
+//              synchronized (threadState[finalI]) {
+//                threadState[finalI][0] = ThreadControl.EXCEPTION;
+//              }
             }
-            threads[i].join();
-            if (thrown[0] != null) {
-              thrown[0].printStackTrace();
-              assertTrue(false);
+            synchronized(threadState[finalI]) {
+              threadState[finalI][0] = ThreadControl.WAIT;
+            }
+            numberRunning.decrementAndGet();
+            synchronized (numberRunning) {
+              numberRunning.notify();              
             }
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-            assertTrue(false);
-          }
-        }
-      } finally {
-        // cleanup
-        // interrupt live threads
-        // wait for all threads to terminate
-        for (Thread thread : threads) {
-          if (thread.isAlive()) {
-            thread.interrupt();
           }
+        }};
+      threads[i] = new Thread(runnable);
+      threads[i].setName(name + " Thread " + i);
+      threads[i].setPriority(Thread.NORM_PRIORITY - 1);
+      threads[i].start();
+    }
+    if (debug) {
+      System.out.format("Time to create %d threads: %,d microsec%n", numberOfThreads, (System.nanoTime() - startTime) / 1000);
+    }
+
+    for (int r = 0; r < repeats; r++) {
+      beforeRepeat.run();
+      
+      repeatNumber[0] = r;
+      assertTrue(numberRunning.get() == 0);
+      assertTrue(numberOfExceptions.get() == 0);
+      
+      startTime = System.nanoTime();       
+      
+      // release all threads from wait point
+      for (int i = 0; i < numberOfThreads; i++) {
+        synchronized (threadState[i]) {
+          assertEquals(ThreadControl.WAIT, threadState[i][0]);
+          threadState[i][0] = ThreadControl.RUN;
+          numberRunning.incrementAndGet();
+          threadState[i].notify();
         }
-        for (Thread thread : threads) {
-          if (thread.isAlive()) {
-            thread.join();
-          }
+      }
+      if (debug) {
+        System.out.format("repeat %,d Time to release %d threads from wait: %,d microsec%n", r, numberOfThreads, (System.nanoTime() - startTime) / 1000);
+      }
+      
+      // wait for all threads to return to wait state
+      
+      synchronized (numberRunning) {
+        while (numberRunning.get() > 0) {
+          numberRunning.wait();
+        }          
+      }
+      for (int i = 0; i < numberOfThreads; i++) {
+       synchronized (threadState[i]) {
+          assertEquals(ThreadControl.WAIT, threadState[i][0]);          
         }
       }
-    }   
+    }  // end of repeat loop
+    
+    for (int i = 0; i < numberOfThreads; i++) {
+      synchronized (threadState[i]) {
+        threadState[i][0] = ThreadControl.TERMINATE;
+        threadState[i].notify();
+      }
+    }
+    
+    // wait for all threads to terminate
+    for (Thread thread : threads) {
+      if (thread.isAlive()) {
+        thread.interrupt();
+      }
+    }
+    for (Thread thread : threads) {
+      if (thread.isAlive()) {
+        thread.join();
+      }
+    }
   }
-
 }