You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/09/09 07:55:31 UTC
incubator-tephra git commit: Fix flaky test
(PooledClientProviderTest). Previously,
the entire client pool was not exhausted.
Repository: incubator-tephra
Updated Branches:
refs/heads/master 307a585cb -> 67eaa768c
Fix flaky test (PooledClientProviderTest). Previously, the entire client pool was not exhausted.
This closes #8
Signed-off-by: poorna <po...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/67eaa768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/67eaa768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/67eaa768
Branch: refs/heads/master
Commit: 67eaa768c9e9ff117315613eabbcf8219f25cfde
Parents: 307a585
Author: Ali Anwar <an...@berkeley.edu>
Authored: Thu Sep 8 23:34:44 2016 -0700
Committer: poorna <po...@apache.org>
Committed: Fri Sep 9 00:55:16 2016 -0700
----------------------------------------------------------------------
.../distributed/PooledClientProviderTest.java | 44 ++++++++++----------
1 file changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/67eaa768/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
index 507cefb..dce8078 100644
--- a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java
@@ -135,30 +135,31 @@ public class PooledClientProviderTest {
}
});
-
//Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more.
- List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>();
- CountDownLatch countDownLatch = new CountDownLatch(1);
+ List<Future<Integer>> clientIds = new ArrayList<>();
+ // We want to ensure that all clients have been exhausted before releasing any.
+ // Only once all the clients are fetched from the pool, will any be released. The last thread will reuse one of
+ // these clients from the pool.
+ CountDownLatch clientDoneLatch = new CountDownLatch(MAX_CLIENT_COUNT);
ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1);
for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
- clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch)));
+ clientIds.add(executor.submit(new RetrieveClient(clientProvider, clientDoneLatch)));
}
- countDownLatch.countDown();
- Set<Integer> ids = new HashSet<Integer>();
+ Set<Integer> ids = new HashSet<>();
for (Future<Integer> id : clientIds) {
ids.add(id.get());
}
Assert.assertEquals(MAX_CLIENT_COUNT, ids.size());
- // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value.
- // one of the threads should throw a TimeOutException, because the other threads don't release their clients
- // within the configured timeout.
- countDownLatch = new CountDownLatch(1);
+ // Now, try it again with, with a countdown latch equal to the number of threads. All of them will only progress
+ // past it, once they all acquire a client or time out while attempting to obtain one.
+ // One of the threads should throw a TimeOutException, because the other threads don't release their clients
+ // until then and the client thread pool isn't enough for the number of threads.
+ clientDoneLatch = new CountDownLatch(MAX_CLIENT_COUNT + 1);
for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) {
- clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch)));
+ clientIds.add(executor.submit(new RetrieveClient(clientProvider, clientDoneLatch)));
}
- countDownLatch.countDown();
int numTimeoutExceptions = 0;
for (Future<Integer> clientId : clientIds) {
try {
@@ -173,7 +174,7 @@ public class PooledClientProviderTest {
CLIENT_OBTAIN_TIMEOUT),
1, numTimeoutExceptions);
- executor.shutdown();
+ executor.shutdownNow();
}
private void waitFor(String errorMessage, Callable<Boolean> callable) throws Exception {
@@ -189,24 +190,23 @@ public class PooledClientProviderTest {
private static class RetrieveClient implements Callable<Integer> {
private final PooledClientProvider pool;
- private final long holdClientMs;
- private final CountDownLatch begin;
+ private final CountDownLatch done;
- public RetrieveClient(PooledClientProvider pool, long holdClientMs,
- CountDownLatch begin) {
+ RetrieveClient(PooledClientProvider pool, CountDownLatch done) {
this.pool = pool;
- this.holdClientMs = holdClientMs;
- this.begin = begin;
+ this.done = done;
}
@Override
public Integer call() throws Exception {
- begin.await();
try (CloseableThriftClient client = pool.getCloseableClient()) {
int id = System.identityHashCode(client.getThriftClient());
- // "use" the client for a configured amount of milliseconds
- Thread.sleep(holdClientMs);
+ done.countDown();
+ done.await();
return id;
+ } catch (TimeoutException e) {
+ done.countDown();
+ throw e;
}
}
}