You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/24 15:21:52 UTC

[2/2] git commit: Use a bounded blocking queue for fetching operations

Use a bounded blocking queue for fetching operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/15bb03c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/15bb03c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/15bb03c2

Branch: refs/heads/piper
Commit: 15bb03c226f93ed7d23216d3f0b0b3a50fe22ba5
Parents: 1f1ad90
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Jul 24 09:21:50 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jul 24 09:21:50 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/core/ft/SafeKeeper.java     |    9 ++++++---
 1 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/15bb03c2/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
index 4c8358f..3b3ec6b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -121,6 +120,10 @@ public final class SafeKeeper implements CheckpointingFramework {
     @Named("s4.checkpointing.fetchingDisabledDurationMs")
     long fetchingDisabledDurationMs = 600000;
 
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingQueueSize")
+    int fetchingQueueSize = 100;
+
     long fetchingDisabledInitTime = -1;
     AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
 
@@ -156,8 +159,8 @@ public final class SafeKeeper implements CheckpointingFramework {
 
         ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
                 .setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
-        fetchingThreadPool = new ThreadPoolExecutor(1, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
-                TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), fetchingThreadFactory);
+        fetchingThreadPool = new ThreadPoolExecutor(0, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
+                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(fetchingQueueSize), fetchingThreadFactory);
         fetchingThreadPool.allowCoreThreadTimeOut(true);
 
     }