You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:30 UTC
[24/38] storm git commit: STORM-2153: Replace timer with
ScheduledThreadPoolExecutor
STORM-2153: Replace timer with ScheduledThreadPoolExecutor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e13f9034
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e13f9034
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e13f9034
Branch: refs/heads/1.x-branch
Commit: e13f903452585ab84e08d5a7ac3a79c43141f232
Parents: 868de5b
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Jan 3 14:48:42 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Jan 3 14:48:42 2018 -0500
----------------------------------------------------------------------
.../jvm/org/apache/storm/utils/DisruptorQueue.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e13f9034/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index d7cf401..6ea3683 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -48,6 +48,7 @@ import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -65,7 +66,7 @@ public class DisruptorQueue implements IStatefulObject {
private static final Object INTERRUPT = new Object();
private static final String PREFIX = "disruptor-";
private static final FlusherPool FLUSHER = new FlusherPool();
- private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
+ private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
private static int getNumFlusherPoolThreads() {
int numThreads = 100;
@@ -437,17 +438,15 @@ public class DisruptorQueue implements IStatefulObject {
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
- try {
- METRICS_TIMER.schedule(new TimerTask() {
+ if(!METRICS_REPORTER_EXECUTOR.isShutdown()) {
+ METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
_disruptorMetrics.set(_metrics);
}
- }, 15000, 15000);
- } catch (IllegalStateException e){
- // Ignore. IllegalStateException is thrown by Timer.schedule() if the timer
- // has been cancelled. (This happens in unit tests)
+ }, 15, 15, TimeUnit.SECONDS);
}
+
}
public String getName() {
@@ -463,7 +462,7 @@ public class DisruptorQueue implements IStatefulObject {
publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
_flusher.close();
_metrics.close();
- METRICS_TIMER.cancel();
+ METRICS_REPORTER_EXECUTOR.shutdown();
} catch (InsufficientCapacityException e) {
//This should be impossible
throw new RuntimeException(e);