You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/05/19 00:03:01 UTC

[3/3] incubator-geode git commit: change async event pool to use all its threads

change async event pool to use all its threads

The system property gemfire.Cache.EVENT_THREAD_LIMIT can be used to configure
the number of threads used the async cache listener invocation.
It defaults to 16.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/08711555
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/08711555
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/08711555

Branch: refs/heads/feature/GEODE-1246
Commit: 08711555673812ea082a410bb2459b6ec092deab
Parents: f03a4c8
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed May 18 16:59:48 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed May 18 16:59:48 2016 -0700

----------------------------------------------------------------------
 .../internal/PooledExecutorWithDMStats.java     |  9 +++---
 .../internal/cache/GemFireCacheImpl.java        |  7 ++---
 .../internal/cache/GemFireCacheImplTest.java    | 32 +++++++++++++++++++-
 3 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08711555/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
index 0a9f9ec..3909474 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/PooledExecutorWithDMStats.java
@@ -39,7 +39,7 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
   /** 
    * Create a new pool
    **/
-  public PooledExecutorWithDMStats(BlockingQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) {
+  public PooledExecutorWithDMStats(SynchronousQueue<Runnable> q, int maxPoolSize, PoolStatHelper stats, ThreadFactory tf, int msTimeout, RejectedExecutionHandler reh) {
     super(getCorePoolSize(maxPoolSize), maxPoolSize,
           msTimeout, TimeUnit.MILLISECONDS,
           q, tf, reh);
@@ -61,9 +61,9 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
    */
   private Thread bufferConsumer;
   
-  private static BlockingQueue<Runnable> initQ(BlockingQueue<Runnable> q) {
+  private static SynchronousQueue<Runnable> initQ(BlockingQueue<Runnable> q) {
     if (q instanceof SynchronousQueue) {
-      return q;
+      return (SynchronousQueue<Runnable>) q;
     } else {
       return new SynchronousQueue/*NoSpin*/<Runnable>();
     }
@@ -95,7 +95,8 @@ public class PooledExecutorWithDMStats extends ThreadPoolExecutor {
             try {
               for (;;) {
                 SystemFailure.checkFailure();
-                putQueue.put(takeQueue.take());
+                Runnable job = takeQueue.take();
+                putQueue.put(job);
               }
             }
             catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08711555/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index d642daa..c44f3b7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -66,7 +66,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -283,6 +282,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   public static boolean DELTAS_RECALCULATE_SIZE = Boolean.getBoolean("gemfire.DELTAS_RECALCULATE_SIZE");
 
   public static final int EVENT_QUEUE_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_QUEUE_LIMIT", 4096).intValue();
+  public static final int EVENT_THREAD_LIMIT = Integer.getInteger("gemfire.Cache.EVENT_THREAD_LIMIT", 16).intValue();
 
   /**
    * System property to limit the max query-execution time. By default its turned off (-1), the time is set in MiliSecs.
@@ -887,11 +887,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
             return thread;
           }
         };
-        // @todo darrel: add stats
-        // this.cachePerfStats.getEventQueueHelper());
         ArrayBlockingQueue q = new ArrayBlockingQueue(EVENT_QUEUE_LIMIT);
-        this.eventThreadPool = new PooledExecutorWithDMStats(q, 16, this.cachePerfStats.getEventPoolHelper(), tf, 1000,
-            new CallerRunsPolicy());
+        this.eventThreadPool = new PooledExecutorWithDMStats(q, EVENT_THREAD_LIMIT, this.cachePerfStats.getEventPoolHelper(), tf, 1000);
       } else {
         this.eventThreadPool = null;
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/08711555/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
index 162f08e..e62a04a 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GemFireCacheImplTest.java
@@ -18,16 +18,46 @@ package com.gemstone.gemfire.internal.cache;
 
 import static org.junit.Assert.*;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Test;
 
 import com.gemstone.gemfire.test.fake.Fakes;
+import com.jayway.awaitility.Awaitility;
 
 public class GemFireCacheImplTest {
 
   @Test
-  public void test() {
+  public void checkThatAsyncEventListenersUseAllThreadsInPool() {
     
     GemFireCacheImpl gfc = GemFireCacheImpl.createWithAsyncEventListeners(Fakes.distributedSystem(), new CacheConfig());
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) gfc.getEventThreadPool();
+    final long initialCount = executor.getCompletedTaskCount();
+    try {
+      int MAX_THREADS = GemFireCacheImpl.EVENT_THREAD_LIMIT;
+      final CountDownLatch cdl = new CountDownLatch(MAX_THREADS);
+      for (int i = 1; i <= MAX_THREADS; i++) {
+        Runnable r = new Runnable() {
+          @Override
+          public void run() {
+            cdl.countDown();
+            try {
+              cdl.await();
+            } catch (InterruptedException e) {
+            }
+          }
+        };
+        executor.execute(r);
+      }
+      Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS).timeout(15, TimeUnit.SECONDS)
+      .until(() -> {
+        return executor.getCompletedTaskCount() == MAX_THREADS+initialCount;
+      });
+    } finally {
+      executor.shutdown();
+    }
   }
 
 }