You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2017/04/25 14:57:02 UTC
tez git commit: TEZ-3695. TestTezSharedExecutor fails sporadically.
(jlowe)
Repository: tez
Updated Branches:
refs/heads/master ad8a80d2b -> db6f05f7f
TEZ-3695. TestTezSharedExecutor fails sporadically. (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db6f05f7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db6f05f7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db6f05f7
Branch: refs/heads/master
Commit: db6f05f7fc7512d37e835f785fc8995b16d4c63b
Parents: ad8a80d
Author: Jason Lowe <jl...@yahoo-inc.com>
Authored: Tue Apr 25 09:55:34 2017 -0500
Committer: Jason Lowe <jl...@yahoo-inc.com>
Committed: Tue Apr 25 09:55:34 2017 -0500
----------------------------------------------------------------------
.../tez/common/TestTezSharedExecutor.java | 52 +++++++-------------
1 file changed, 18 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/db6f05f7/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
index 8d87846..9ea07ef 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezSharedExecutor.java
@@ -23,10 +23,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
@@ -57,16 +56,14 @@ public class TestTezSharedExecutor {
}
private static class Wait implements Runnable {
- private final Object ref;
- Wait(Object ref) {
- this.ref = ref == null ? this : ref;
+ private final CountDownLatch latch;
+ Wait(CountDownLatch latch) {
+ this.latch = latch;
}
@Override
public void run() {
try {
- synchronized (ref) {
- ref.wait();
- }
+ latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -113,12 +110,6 @@ public class TestTezSharedExecutor {
}
}
- private void _notify(Object obj) {
- synchronized (obj) {
- obj.notify();
- }
- }
-
private TezSharedExecutor sharedExecutor;
@Before
@@ -132,7 +123,7 @@ public class TestTezSharedExecutor {
sharedExecutor = null;
}
- @Test(timeout=2000)
+ @Test(timeout=10000)
public void testSimpleExecution() throws Exception {
ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();
@@ -167,42 +158,35 @@ public class TestTezSharedExecutor {
}
}
- @Test(timeout=5000)
+ @Test(timeout=10000)
public void testAwaitTermination() throws Exception {
ExecutorService service = sharedExecutor.createExecutorService(1, "await-termination");
+ CountDownLatch latch = new CountDownLatch(1);
- final Runnable runnable = new Wait(null);
+ final Runnable runnable = new Wait(latch);
service.submit(runnable);
service.shutdown();
- // No notify sent hence it should fail.
+ // Task stuck on latch hence it should fail to terminate.
Assert.assertFalse(service.awaitTermination(100, TimeUnit.MILLISECONDS));
Assert.assertFalse(service.isTerminated());
Assert.assertTrue(service.isShutdown());
- Timer timer = new Timer(true);
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- _notify(runnable);
- }
- }, 100);
+ latch.countDown();
- // Highly unlikely that there are intermittent failures, but a possiblity :-(.
- Assert.assertTrue(service.awaitTermination(1, TimeUnit.SECONDS));
+ Assert.assertTrue(service.awaitTermination(5, TimeUnit.SECONDS));
Assert.assertTrue(service.isTerminated());
Assert.assertTrue(service.isShutdown());
-
- timer.cancel();
}
- @Test(timeout=2000)
+ @Test(timeout=10000)
public void testSerialExecution() throws Exception {
ExecutorService service = sharedExecutor.createExecutorService(1, "serial-test");
+ CountDownLatch latch = new CountDownLatch(1);
// Since it is serial we should never get concurrent modification exception too.
+ Future<?> f1 = service.submit(new Wait(latch));
List<Integer> list = new ArrayList<>();
- Future<?> f1 = service.submit(new Wait(list));
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
futures.add(service.submit(new Appender<Integer>(list, i)));
@@ -211,9 +195,9 @@ public class TestTezSharedExecutor {
// This shutdown does not prevent already submitted tasks from completing.
service.shutdown();
- // Until we notify nothing moves forward.
+ // Until we release the task from the latch nothing moves forward.
Assert.assertEquals(0, list.size());
- _notify(list);
+ latch.countDown();
f1.get();
// Wait for all futures to finish.
@@ -224,7 +208,7 @@ public class TestTezSharedExecutor {
Assert.assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), list);
}
- @Test(timeout=5000)
+ @Test(timeout=10000)
public void testParallelExecution() throws Exception {
ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();