You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by gi...@apache.org on 2018/07/06 23:34:56 UTC
[incubator-druid] branch master updated: Increase timeout for
BlockingPoolTest (#5959)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new d1d9358 Increase timeout for BlockingPoolTest (#5959)
d1d9358 is described below
commit d1d935827477af9cb9d049afbf9ce28d319a9890
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jul 6 16:34:53 2018 -0700
Increase timeout for BlockingPoolTest (#5959)
---
.../io/druid/collections/BlockingPoolTest.java | 153 +++++++++++----------
1 file changed, 81 insertions(+), 72 deletions(-)
diff --git a/common/src/test/java/io/druid/collections/BlockingPoolTest.java b/common/src/test/java/io/druid/collections/BlockingPoolTest.java
index a6459a8..a1d673a 100644
--- a/common/src/test/java/io/druid/collections/BlockingPoolTest.java
+++ b/common/src/test/java/io/druid/collections/BlockingPoolTest.java
@@ -21,7 +21,9 @@ package io.druid.collections;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
-import org.junit.AfterClass;
+import io.druid.java.util.common.concurrent.Execs;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -30,7 +32,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
@@ -40,26 +41,34 @@ import static org.junit.Assert.assertTrue;
public class BlockingPoolTest
{
- private static final ExecutorService SERVICE = Executors.newFixedThreadPool(2);
+ private ExecutorService service;
- private static final DefaultBlockingPool<Integer> POOL = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10);
- private static final BlockingPool<Integer> EMPTY_POOL = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 0);
+ private DefaultBlockingPool<Integer> pool;
+ private BlockingPool<Integer> emptyPool;
- @AfterClass
- public static void teardown()
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setup()
{
- SERVICE.shutdown();
+ service = Execs.multiThreaded(2, "blocking-pool-test");
+ pool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 10);
+ emptyPool = new DefaultBlockingPool<>(Suppliers.ofInstance(1), 0);
}
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
+ @After
+ public void teardown()
+ {
+ service.shutdownNow();
+ }
@Test
public void testTakeFromEmptyPool()
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
- EMPTY_POOL.take(0);
+ emptyPool.take(0);
}
@Test
@@ -67,49 +76,49 @@ public class BlockingPoolTest
{
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Pool was initialized with limit = 0, there are no objects to take.");
- EMPTY_POOL.takeBatch(1, 0);
+ emptyPool.takeBatch(1, 0);
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testTake()
{
- final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100);
+ final ReferenceCountingResourceHolder<Integer> holder = pool.take(100);
assertNotNull(holder);
- assertEquals(9, POOL.getPoolSize());
+ assertEquals(9, pool.getPoolSize());
holder.close();
- assertEquals(10, POOL.getPoolSize());
+ assertEquals(10, pool.getPoolSize());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testTakeTimeout()
{
- final List<ReferenceCountingResourceHolder<Integer>> batchHolder = POOL.takeBatch(10, 100L);
- final ReferenceCountingResourceHolder<Integer> holder = POOL.take(100);
+ final List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 100L);
+ final ReferenceCountingResourceHolder<Integer> holder = pool.take(100);
assertNull(holder);
batchHolder.forEach(ReferenceCountingResourceHolder::close);
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testTakeBatch()
{
- final List<ReferenceCountingResourceHolder<Integer>> holder = POOL.takeBatch(6, 100L);
+ final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(6, 100L);
assertNotNull(holder);
assertEquals(6, holder.size());
- assertEquals(4, POOL.getPoolSize());
+ assertEquals(4, pool.getPoolSize());
holder.forEach(ReferenceCountingResourceHolder::close);
- assertEquals(10, POOL.getPoolSize());
+ assertEquals(10, pool.getPoolSize());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testWaitAndTakeBatch() throws InterruptedException, ExecutionException
{
- List<ReferenceCountingResourceHolder<Integer>> batchHolder = POOL.takeBatch(10, 10);
+ List<ReferenceCountingResourceHolder<Integer>> batchHolder = pool.takeBatch(10, 10);
assertNotNull(batchHolder);
assertEquals(10, batchHolder.size());
- assertEquals(0, POOL.getPoolSize());
+ assertEquals(0, pool.getPoolSize());
- final Future<List<ReferenceCountingResourceHolder<Integer>>> future = SERVICE.submit(
- () -> POOL.takeBatch(8, 100)
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> future = service.submit(
+ () -> pool.takeBatch(8, 100)
);
Thread.sleep(20);
batchHolder.forEach(ReferenceCountingResourceHolder::close);
@@ -117,26 +126,26 @@ public class BlockingPoolTest
batchHolder = future.get();
assertNotNull(batchHolder);
assertEquals(8, batchHolder.size());
- assertEquals(2, POOL.getPoolSize());
+ assertEquals(2, pool.getPoolSize());
batchHolder.forEach(ReferenceCountingResourceHolder::close);
- assertEquals(10, POOL.getPoolSize());
+ assertEquals(10, pool.getPoolSize());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testTakeBatchTooManyObjects()
{
- final List<ReferenceCountingResourceHolder<Integer>> holder = POOL.takeBatch(100, 100L);
+ final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(100, 100L);
assertTrue(holder.isEmpty());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testConcurrentTake() throws ExecutionException, InterruptedException
{
- final int limit1 = POOL.maxSize() / 2;
- final int limit2 = POOL.maxSize() - limit1 + 1;
+ final int limit1 = pool.maxSize() / 2;
+ final int limit2 = pool.maxSize() - limit1 + 1;
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = service.submit(
new Callable<List<ReferenceCountingResourceHolder<Integer>>>()
{
@Override
@@ -144,13 +153,13 @@ public class BlockingPoolTest
{
List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList();
for (int i = 0; i < limit1; i++) {
- result.add(POOL.take(10));
+ result.add(pool.take(10));
}
return result;
}
}
);
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(
new Callable<List<ReferenceCountingResourceHolder<Integer>>>()
{
@Override
@@ -158,7 +167,7 @@ public class BlockingPoolTest
{
List<ReferenceCountingResourceHolder<Integer>> result = Lists.newArrayList();
for (int i = 0; i < limit2; i++) {
- result.add(POOL.take(10));
+ result.add(pool.take(10));
}
return result;
}
@@ -168,7 +177,7 @@ public class BlockingPoolTest
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
- assertEquals(0, POOL.getPoolSize());
+ assertEquals(0, pool.getPoolSize());
assertTrue(r1.contains(null) || r2.contains(null));
int nonNullCount = 0;
@@ -183,9 +192,9 @@ public class BlockingPoolTest
nonNullCount++;
}
}
- assertEquals(POOL.maxSize(), nonNullCount);
+ assertEquals(pool.maxSize(), nonNullCount);
- final Future future1 = SERVICE.submit(new Runnable()
+ final Future future1 = service.submit(new Runnable()
{
@Override
public void run()
@@ -197,7 +206,7 @@ public class BlockingPoolTest
}
}
});
- final Future future2 = SERVICE.submit(new Runnable()
+ final Future future2 = service.submit(new Runnable()
{
@Override
public void run()
@@ -213,50 +222,50 @@ public class BlockingPoolTest
future1.get();
future2.get();
- assertEquals(POOL.maxSize(), POOL.getPoolSize());
+ assertEquals(pool.maxSize(), pool.getPoolSize());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testConcurrentTakeBatch() throws ExecutionException, InterruptedException
{
- final int batch1 = POOL.maxSize() / 2;
- final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> POOL.takeBatch(batch1, 10);
+ final int batch1 = pool.maxSize() / 2;
+ final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> pool.takeBatch(batch1, 10);
- final int batch2 = POOL.maxSize() - batch1 + 1;
- final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(batch2, 10);
+ final int batch2 = pool.maxSize() - batch1 + 1;
+ final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> pool.takeBatch(batch2, 10);
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(c1);
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2);
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = service.submit(c1);
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(c2);
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
if (r1 != null) {
assertTrue(r2.isEmpty());
- assertEquals(POOL.maxSize() - batch1, POOL.getPoolSize());
+ assertEquals(pool.maxSize() - batch1, pool.getPoolSize());
assertEquals(batch1, r1.size());
r1.forEach(ReferenceCountingResourceHolder::close);
} else {
assertNotNull(r2);
- assertEquals(POOL.maxSize() - batch2, POOL.getPoolSize());
+ assertEquals(pool.maxSize() - batch2, pool.getPoolSize());
assertEquals(batch2, r2.size());
r2.forEach(ReferenceCountingResourceHolder::close);
}
- assertEquals(POOL.maxSize(), POOL.getPoolSize());
+ assertEquals(pool.maxSize(), pool.getPoolSize());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testConcurrentBatchClose() throws ExecutionException, InterruptedException
{
- final int batch1 = POOL.maxSize() / 2;
- final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> POOL.takeBatch(batch1, 10);
+ final int batch1 = pool.maxSize() / 2;
+ final Callable<List<ReferenceCountingResourceHolder<Integer>>> c1 = () -> pool.takeBatch(batch1, 10);
- final int batch2 = POOL.maxSize() - batch1;
- final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(batch2, 10);
+ final int batch2 = pool.maxSize() - batch1;
+ final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> pool.takeBatch(batch2, 10);
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = SERVICE.submit(c1);
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2);
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f1 = service.submit(c1);
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(c2);
final List<ReferenceCountingResourceHolder<Integer>> r1 = f1.get();
final List<ReferenceCountingResourceHolder<Integer>> r2 = f2.get();
@@ -265,9 +274,9 @@ public class BlockingPoolTest
assertNotNull(r2);
assertEquals(batch1, r1.size());
assertEquals(batch2, r2.size());
- assertEquals(0, POOL.getPoolSize());
+ assertEquals(0, pool.getPoolSize());
- final Future future1 = SERVICE.submit(new Runnable()
+ final Future future1 = service.submit(new Runnable()
{
@Override
public void run()
@@ -275,7 +284,7 @@ public class BlockingPoolTest
r1.forEach(ReferenceCountingResourceHolder::close);
}
});
- final Future future2 = SERVICE.submit(new Runnable()
+ final Future future2 = service.submit(new Runnable()
{
@Override
public void run()
@@ -287,18 +296,18 @@ public class BlockingPoolTest
future1.get();
future2.get();
- assertEquals(POOL.maxSize(), POOL.getPoolSize());
+ assertEquals(pool.maxSize(), pool.getPoolSize());
}
- @Test(timeout = 1000)
+ @Test(timeout = 5000)
public void testConcurrentTakeBatchClose() throws ExecutionException, InterruptedException
{
- final List<ReferenceCountingResourceHolder<Integer>> r1 = POOL.takeBatch(1, 10);
+ final List<ReferenceCountingResourceHolder<Integer>> r1 = pool.takeBatch(1, 10);
- final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> POOL.takeBatch(10, 100);
+ final Callable<List<ReferenceCountingResourceHolder<Integer>>> c2 = () -> pool.takeBatch(10, 100);
- final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = SERVICE.submit(c2);
- final Future f1 = SERVICE.submit(new Runnable()
+ final Future<List<ReferenceCountingResourceHolder<Integer>>> f2 = service.submit(c2);
+ final Future f1 = service.submit(new Runnable()
{
@Override
public void run()
@@ -317,9 +326,9 @@ public class BlockingPoolTest
f1.get();
assertNotNull(r2);
assertEquals(10, r2.size());
- assertEquals(0, POOL.getPoolSize());
+ assertEquals(0, pool.getPoolSize());
r2.forEach(ReferenceCountingResourceHolder::close);
- assertEquals(POOL.maxSize(), POOL.getPoolSize());
+ assertEquals(pool.maxSize(), pool.getPoolSize());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org