You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:30 UTC

[24/38] storm git commit: STORM-2153: Replace timer with ScheduledThreadPoolExecutor

STORM-2153: Replace timer with ScheduledThreadPoolExecutor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e13f9034
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e13f9034
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e13f9034

Branch: refs/heads/1.x-branch
Commit: e13f903452585ab84e08d5a7ac3a79c43141f232
Parents: 868de5b
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Jan 3 14:48:42 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Jan 3 14:48:42 2018 -0500

----------------------------------------------------------------------
 .../jvm/org/apache/storm/utils/DisruptorQueue.java   | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e13f9034/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index d7cf401..6ea3683 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -48,6 +48,7 @@ import java.util.TimerTask;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -65,7 +66,7 @@ public class DisruptorQueue implements IStatefulObject {
     private static final Object INTERRUPT = new Object();
     private static final String PREFIX = "disruptor-";
     private static final FlusherPool FLUSHER = new FlusherPool();
-    private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
+    private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1);
 
     private static int getNumFlusherPoolThreads() {
         int numThreads = 100;
@@ -437,17 +438,15 @@ public class DisruptorQueue implements IStatefulObject {
 
         _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
         _flusher.start();
-        try {
-            METRICS_TIMER.schedule(new TimerTask() {
+        if(!METRICS_REPORTER_EXECUTOR.isShutdown()) {
+            METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
                 @Override
                 public void run() {
                     _disruptorMetrics.set(_metrics);
                 }
-            }, 15000, 15000);
-        } catch (IllegalStateException e){
-            // Ignore. IllegalStateException is thrown by Timer.schedule() if the timer
-            // has been cancelled. (This happens in unit tests)
+            }, 15, 15, TimeUnit.SECONDS);
         }
+
     }
 
     public String getName() {
@@ -463,7 +462,7 @@ public class DisruptorQueue implements IStatefulObject {
             publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true);
             _flusher.close();
             _metrics.close();
-            METRICS_TIMER.cancel();
+            METRICS_REPORTER_EXECUTOR.shutdown();
         } catch (InsufficientCapacityException e) {
             //This should be impossible
             throw new RuntimeException(e);