You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2017/04/25 14:57:02 UTC

tez git commit: TEZ-3695. TestTezSharedExecutor fails sporadically. (jlowe)

Repository: tez
Updated Branches:
  refs/heads/master ad8a80d2b -> db6f05f7f


TEZ-3695. TestTezSharedExecutor fails sporadically. (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db6f05f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db6f05f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db6f05f7

Branch: refs/heads/master
Commit: db6f05f7fc7512d37e835f785fc8995b16d4c63b
Parents: ad8a80d
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Tue Apr 25 09:55:34 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Tue Apr 25 09:55:34 2017 -0500

----------------------------------------------------------------------
 .../tez/common/TestTezSharedExecutor.java       | 52 +++++++-------------
 1 file changed, 18 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/db6f05f7/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
index 8d87846..9ea07ef 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
@@ -23,10 +23,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -57,16 +56,14 @@ public class TestTezSharedExecutor {
   }
 
   private static class Wait implements Runnable {
-    private final Object ref;
-    Wait(Object ref) {
-      this.ref = ref == null ? this : ref;
+    private final CountDownLatch latch;
+    Wait(CountDownLatch latch) {
+      this.latch = latch;
     }
     @Override
     public void run() {
       try {
-        synchronized (ref) {
-          ref.wait();
-        }
+        latch.await();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -113,12 +110,6 @@ public class TestTezSharedExecutor {
     }
   }
 
-  private void _notify(Object obj) {
-    synchronized (obj) {
-      obj.notify();
-    }
-  }
-
   private TezSharedExecutor sharedExecutor;
 
   @Before
@@ -132,7 +123,7 @@ public class TestTezSharedExecutor {
     sharedExecutor = null;
   }
 
-  @Test(timeout=2000)
+  @Test(timeout=10000)
   public void testSimpleExecution() throws Exception {
     ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
 
@@ -167,42 +158,35 @@ public class TestTezSharedExecutor {
     }
   }
 
-  @Test(timeout=5000)
+  @Test(timeout=10000)
   public void testAwaitTermination() throws Exception {
     ExecutorService service = sharedExecutor.createExecutorService(1, "await-termination");
+    CountDownLatch latch = new CountDownLatch(1);
 
-    final Runnable runnable = new Wait(null);
+    final Runnable runnable = new Wait(latch);
     service.submit(runnable);
     service.shutdown();
 
-    // No notify sent hence it should fail.
+    // Task stuck on latch hence it should fail to terminate.
     Assert.assertFalse(service.awaitTermination(100, TimeUnit.MILLISECONDS));
     Assert.assertFalse(service.isTerminated());
     Assert.assertTrue(service.isShutdown());
 
-    Timer timer = new Timer(true);
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        _notify(runnable);
-      }
-    }, 100);
+    latch.countDown();
 
-    // Highly unlikely that there are intermittent failures, but a possiblity :-(.
-    Assert.assertTrue(service.awaitTermination(1, TimeUnit.SECONDS));
+    Assert.assertTrue(service.awaitTermination(5, TimeUnit.SECONDS));
     Assert.assertTrue(service.isTerminated());
     Assert.assertTrue(service.isShutdown());
-
-    timer.cancel();
   }
 
-  @Test(timeout=2000)
+  @Test(timeout=10000)
   public void testSerialExecution() throws Exception {
     ExecutorService service = sharedExecutor.createExecutorService(1, "serial-test");
+    CountDownLatch latch = new CountDownLatch(1);
 
     // Since it is serial we should never get concurrent modification exception too.
+    Future<?> f1 = service.submit(new Wait(latch));
     List<Integer> list = new ArrayList<>();
-    Future<?> f1 = service.submit(new Wait(list));
     List<Future<?>> futures = new ArrayList<>();
     for (int i = 0; i < 10; ++i) {
       futures.add(service.submit(new Appender<Integer>(list, i)));
@@ -211,9 +195,9 @@ public class TestTezSharedExecutor {
     // This shutdown does not prevent already submitted tasks from completing.
     service.shutdown();
 
-    // Until we notify nothing moves forward.
+    // Until we release the task from the latch nothing moves forward.
     Assert.assertEquals(0, list.size());
-    _notify(list);
+    latch.countDown();
     f1.get();
 
     // Wait for all futures to finish.
@@ -224,7 +208,7 @@ public class TestTezSharedExecutor {
     Assert.assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), list);
   }
 
-  @Test(timeout=5000)
+  @Test(timeout=10000)
   public void testParallelExecution() throws Exception {
     ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();