You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/24 15:21:52 UTC
[2/2] git commit: Use a bounded blocking queue for fetching operations
Use a bounded blocking queue for fetching operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/15bb03c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/15bb03c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/15bb03c2
Branch: refs/heads/piper
Commit: 15bb03c226f93ed7d23216d3f0b0b3a50fe22ba5
Parents: 1f1ad90
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Jul 24 09:21:50 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jul 24 09:21:50 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/core/ft/SafeKeeper.java | 9 ++++++---
1 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/15bb03c2/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
index 4c8358f..3b3ec6b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -22,7 +22,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -121,6 +120,10 @@ public final class SafeKeeper implements CheckpointingFramework {
@Named("s4.checkpointing.fetchingDisabledDurationMs")
long fetchingDisabledDurationMs = 600000;
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingQueueSize")
+ int fetchingQueueSize = 100;
+
long fetchingDisabledInitTime = -1;
AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
@@ -156,8 +159,8 @@ public final class SafeKeeper implements CheckpointingFramework {
ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
.setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
- fetchingThreadPool = new ThreadPoolExecutor(1, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
- TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), fetchingThreadFactory);
+ fetchingThreadPool = new ThreadPoolExecutor(0, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(fetchingQueueSize), fetchingThreadFactory);
fetchingThreadPool.allowCoreThreadTimeOut(true);
}