You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/05/19 00:03:01 UTC
[3/3] incubator-geode git commit: change async event pool to use all
its threads
change async event pool to use all its threads
The system property gemfire.Cache.EVENT_THREAD_LIMIT can be used to configure
the number of threads used the async cache listener invocation.
It defaults to 16.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/08711555
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/08711555
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/08711555
Branch: refs/heads/feature/GEODE-1246
Commit: 08711555673812ea082a410bb2459b6ec092deab
Parents: f03a4c8
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed May 18 16:59:48 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed May 18 16:59:48 2016 -0700
----------------------------------------------------------------------
.../internal/PooledExecutorWithDMStats.java | 9 +++---
.../internal/cache/GemFireCacheImpl.java | 7 ++---
.../internal/cache/GemFireCacheImplTest.java | 32 +++++++++++++++++++-
3 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08711555/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
index 0a9f9ec..3909474 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
@@ -39,7 +39,7 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
/**
* Create a new pool
**/
- public PooledExecutorWithDMStats(BlockingQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) {
+ public PooledExecutorWithDMStats(SynchronousQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) {
super(getCorePoolSize(maxPoolSize), maxPoolSize,
msTimeout, TimeUnit.MILLISECONDS,
q, tf, reh);
@@ -61,9 +61,9 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
*/
private Thread bufferConsumer;
- private static BlockingQueue<Runnable> initQ(BlockingQueue<Runnable> q) {
+ private static SynchronousQueue<Runnable> initQ(BlockingQueue<Runnable> q) {
if (q instanceof SynchronousQueue) {
- return q;
+ return (SynchronousQueue<Runnable>) q;
} else {
return new SynchronousQueue/*NoSpin*/<Runnable>();
}
@@ -95,7 +95,8 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
try {
for (;;) {
SystemFailure.checkFailure();
- putQueue.put(takeQueue.take());
+ Runnable job = takeQueue.take();
+ putQueue.put(job);
}
}
catch (InterruptedException ie) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08711555/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index d642daa..c44f3b7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -66,7 +66,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -283,6 +282,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
public static boolean DELTAS_RECALCULATE_SIZE = Boolean.getBoolean("gemfire.DELTAS_RECALCULATE_SIZE");
public static final int EVENT_QUEUE_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_QUEUE_LIMIT", 4096).intValue();
+ public static final int EVENT_THREAD_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_THREAD_LIMIT", 16).intValue();
/**
* System property to limit the max query-execution time. By default its turned off (-1), the time is set in MiliSecs.
@@ -887,11 +887,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return thread;
}
};
- // @todo darrel: add stats
- // this.cachePerfStats.getEventQueueHelper());
ArrayBlockingQueue q = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT);
- this.eventThreadPool = new PooledExecutorWithDMStats(q, 16, this.cachePerfStats.getEventPoolHelper(), tf, 1000,
- new CallerRunsPolicy());
+ this.eventThreadPool = new PooledExecutorWithDMStats(q, EVENT_THREAD_LIMIT, this.cachePerfStats.getEventPoolHelper(), tf, 1000);
} else {
this.eventThreadPool = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08711555/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
index 162f08e..e62a04a 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
@@ -18,16 +18,46 @@ package com.gemstone.gemfire.internal.cache;
import static org.junit.Assert.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Test;
import com.gemstone.gemfire.test.fake.Fakes;
+import com.jayway.awaitility.Awaitility;
public class GemFireCacheImplTest {
@Test
- public void test() {
+ public void checkThatAsyncEventListenersUseAllThreadsInPool() {
GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(Fakes.distributedSystem(), new CacheConfig());
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) gfc.getEventThreadPool();
+ final long initialCount = executor.getCompletedTaskCount();
+ try {
+ int MAX_THREADS = GemFireCacheImpl.EVENT_THREAD_LIMIT;
+ final CountDownLatch cdl = new CountDownLatch(MAX_THREADS);
+ for (int i = 1; i <= MAX_THREADS; i++) {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ cdl.countDown();
+ try {
+ cdl.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ };
+ executor.execute(r);
+ }
+ Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS).timeout(15, TimeUnit.SECONDS)
+ .until(() -> {
+ return executor.getCompletedTaskCount() == MAX_THREADS+initialCount;
+ });
+ } finally {
+ executor.shutdown();
+ }
}
}