You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/09 07:55:31 UTC

incubator-tephra git commit: Fix flaky test (PooledClientProviderTest). Previously, the entire client pool was not exhausted.

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 307a585cb -> 67eaa768c


Fix flaky test (PooledClientProviderTest). Previously, the entire client pool was not exhausted.

This closes #8

Signed-off-by: poorna <po...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/67eaa768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/67eaa768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/67eaa768

Branch: refs/heads/master
Commit: 67eaa768c9e9ff117315613eabbcf8219f25cfde
Parents: 307a585
Author: Ali Anwar <an...@berkeley.edu>
Authored: Thu Sep 8 23:34:44 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Fri Sep 9 00:55:16 2016 -0700

----------------------------------------------------------------------
 .../distributed/PooledClientProviderTest.java   | 44 ++++++++++----------
 1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/67eaa768/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
index 507cefb..dce8078 100644
--- a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
@@ -135,30 +135,31 @@ public class PooledClientProviderTest {
       }
     });
 
-
     //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more.
-    List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
-    CountDownLatch countDownLatch = new CountDownLatch(1);
+    List<Future<Integer>> clientIds = new ArrayList<>();
+    // We want to ensure that all clients have been exhausted before releasing any.
+    // Only once all the clients are fetched from the pool, will any be released. The last thread will reuse one of
+    // these clients from the pool.
+    CountDownLatch clientDoneLatch = new CountDownLatch(MAX_CLIENT_COUNT);
     ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1);
     for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
-      clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch)));
+      clientIds.add(executor.submit(new RetrieveClient(clientProvider, clientDoneLatch)));
     }
-    countDownLatch.countDown();
 
-    Set<Integer> ids = new HashSet<Integer>();
+    Set<Integer> ids = new HashSet<>();
     for (Future<Integer> id : clientIds) {
       ids.add(id.get());
     }
     Assert.assertEquals(MAX_CLIENT_COUNT, ids.size());
 
-    // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value.
-    // one of the threads should throw a TimeOutException, because the other threads don't release their clients
-    // within the configured timeout.
-    countDownLatch = new CountDownLatch(1);
+    // Now, try it again with, with a countdown latch equal to the number of threads. All of them will only progress
+    // past it, once they all acquire a client or time out while attempting to obtain one.
+    // One of the threads should throw a TimeOutException, because the other threads don't release their clients
+    // until then and the client thread pool isn't enough for the number of threads.
+    clientDoneLatch = new CountDownLatch(MAX_CLIENT_COUNT + 1);
     for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
-      clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch)));
+      clientIds.add(executor.submit(new RetrieveClient(clientProvider, clientDoneLatch)));
     }
-    countDownLatch.countDown();
     int numTimeoutExceptions = 0;
     for (Future<Integer> clientId : clientIds) {
       try {
@@ -173,7 +174,7 @@ public class PooledClientProviderTest {
                                       CLIENT_OBTAIN_TIMEOUT),
                         1, numTimeoutExceptions);
 
-    executor.shutdown();
+    executor.shutdownNow();
   }
 
   private void waitFor(String errorMessage, Callable<Boolean> callable) throws Exception {
@@ -189,24 +190,23 @@ public class PooledClientProviderTest {
 
   private static class RetrieveClient implements Callable<Integer> {
     private final PooledClientProvider pool;
-    private final long holdClientMs;
-    private final CountDownLatch begin;
+    private final CountDownLatch done;
 
-    public RetrieveClient(PooledClientProvider pool, long holdClientMs,
-                          CountDownLatch begin) {
+    RetrieveClient(PooledClientProvider pool, CountDownLatch done) {
       this.pool = pool;
-      this.holdClientMs = holdClientMs;
-      this.begin = begin;
+      this.done = done;
     }
 
     @Override
     public Integer call() throws Exception {
-      begin.await();
       try (CloseableThriftClient client = pool.getCloseableClient()) {
         int id = System.identityHashCode(client.getThriftClient());
-        // "use" the client for a configured amount of milliseconds
-        Thread.sleep(holdClientMs);
+        done.countDown();
+        done.await();
         return id;
+      } catch (TimeoutException e) {
+        done.countDown();
+        throw e;
       }
     }
   }