You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/01 20:53:37 UTC

[2/4] storm git commit: STORM-2910: have metrics reported in the background

STORM-2910: have metrics reported in the background


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48c2fda8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48c2fda8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48c2fda8

Branch: refs/heads/master
Commit: 48c2fda867aa1d1123f6ba9c9f623ae80c9df280
Parents: ed0548e
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jan 25 09:49:20 2018 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jan 25 09:49:20 2018 -0600

----------------------------------------------------------------------
 .../storm/daemon/supervisor/Container.java      | 18 ++++---
 .../daemon/supervisor/OnlyLatestExecutor.java   | 55 ++++++++++++++++++++
 .../daemon/supervisor/ReadClusterState.java     |  6 ++-
 .../apache/storm/daemon/supervisor/Slot.java    | 33 +++++++-----
 .../storm/daemon/supervisor/Supervisor.java     | 16 +++++-
 .../storm/daemon/supervisor/SlotTest.java       | 15 +++---
 6 files changed, 112 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index f45ce25..a06e44c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -50,6 +50,7 @@ import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -709,7 +710,7 @@ public abstract class Container implements Killable {
     /**
      * Send worker metrics to Nimbus.
      */
-    void processMetrics() {
+    void processMetrics(OnlyLatestExecutor<Integer> exec) {
         try {
             if (_usedMemory.get(_port) != null) {
                 // Make sure we don't process too frequently.
@@ -725,20 +726,23 @@ public abstract class Container implements Killable {
                 long timestamp = System.currentTimeMillis();
                 double value = _usedMemory.get(_port).memory;
                 WorkerMetricPoint workerMetric = new WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID,
-                        INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
+                    INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
 
                 WorkerMetricList metricList = new WorkerMetricList();
                 metricList.add_to_metrics(workerMetric);
                 WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, hostname, metricList);
 
-                try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) {
-                    client.getClient().processWorkerMetrics(metrics);
-                }
-
-                this.lastMetricProcessTime = currentTimeMsec;
+                exec.execute(_port, () -> {
+                    try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) {
+                        client.getClient().processWorkerMetrics(metrics);
+                    } catch (Exception e) {
+                        LOG.error("Failed to process metrics", e);
+                    }
+                });
             }
         } catch (Exception e) {
             LOG.error("Failed to process metrics", e);
+        } finally {
             this.lastMetricProcessTime = System.currentTimeMillis();
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
new file mode 100644
index 0000000..bd73766
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/OnlyLatestExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+
+/**
+ * This allows you to submit a Runnable with a key.  If the previous submission for that key has not yet run,
+ * it will be replaced with the latest one.
+ */
+public class OnlyLatestExecutor<K> {
+    private final Executor exec;
+    private final ConcurrentMap<K, Runnable> latest;
+
+    public OnlyLatestExecutor(Executor exec) {
+        this.exec = exec;
+        latest = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Run something in the future, but replace it with the latest if it is taking too long
+     * @param key what to use to dedupe things.
+     * @param r what you want to run.
+     */
+    public void execute(final K key, Runnable r) {
+        Runnable old = latest.put(key, r);
+        if (old == null) {
+            //It was not there before so we need to run it.
+            exec.execute(() -> {
+                Runnable run = latest.remove(key);
+                if (run != null) {
+                    run.run();;
+                }
+            });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 884efcb..5d8bb33 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -65,7 +65,8 @@ public class ReadClusterState implements Runnable, AutoCloseable {
     private final String host;
     private final LocalState localState;
     private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
-    
+    private final OnlyLatestExecutor<Integer> metricsExec;
+
     public ReadClusterState(Supervisor supervisor) throws Exception {
         this.superConf = supervisor.getConf();
         this.stormClusterState = supervisor.getStormClusterState();
@@ -77,6 +78,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
         this.host = supervisor.getHostName();
         this.localState = supervisor.getLocalState();
         this.cachedAssignments = supervisor.getCurrAssignment();
+        this.metricsExec = new OnlyLatestExecutor<>(supervisor.getHeartbeatExecutor());
         
         this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());
         
@@ -108,7 +110,7 @@ public class ReadClusterState implements Runnable, AutoCloseable {
 
     private Slot mkSlot(int port) throws Exception {
         return new Slot(localizer, superConf, launcher, host, port,
-                localState, stormClusterState, iSuper, cachedAssignments);
+                localState, stormClusterState, iSuper, cachedAssignments, metricsExec);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index fe30c93..6700291 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -95,12 +95,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         public final ISupervisor iSupervisor;
         public final LocalState localState;
         public final BlobChangingCallback changingCallback;
-        
+        public final OnlyLatestExecutor<Integer> metricsExec;
+
         StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs,
-                long killSleepMs, long monitorFreqMs,
-                ContainerLauncher containerLauncher, String host, int port,
-                ISupervisor iSupervisor, LocalState localState,
-                BlobChangingCallback changingCallback) {
+                    long killSleepMs, long monitorFreqMs,
+                    ContainerLauncher containerLauncher, String host, int port,
+                    ISupervisor iSupervisor, LocalState localState,
+                    BlobChangingCallback changingCallback,
+                    OnlyLatestExecutor<Integer> metricsExec) {
             this.localizer = localizer;
             this.hbTimeoutMs = hbTimeoutMs;
             this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -112,6 +114,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             this.iSupervisor = iSupervisor;
             this.localState = localState;
             this.changingCallback = changingCallback;
+            this.metricsExec = metricsExec;
         }
     }
 
@@ -937,7 +940,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             dynamicState = dynamicState.withProfileActions(mod, modPending);
         }
 
-        dynamicState.container.processMetrics();
+        dynamicState.container.processMetrics(staticState.metricsExec);
 
         Time.sleep(staticState.monitorFreqMs);
         return dynamicState;
@@ -971,14 +974,17 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
     private volatile boolean done = false;
     private volatile DynamicState dynamicState;
     private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments;
-    
+    private final OnlyLatestExecutor<Integer> metricsExec;
+
     public Slot(AsyncLocalizer localizer, Map<String, Object> conf,
-            ContainerLauncher containerLauncher, String host,
-            int port, LocalState localState,
-            IStormClusterState clusterState,
-            ISupervisor iSupervisor,
-            AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments) throws Exception {
+                ContainerLauncher containerLauncher, String host,
+                int port, LocalState localState,
+                IStormClusterState clusterState,
+                ISupervisor iSupervisor,
+                AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments,
+                OnlyLatestExecutor<Integer> metricsExec) throws Exception {
         super("SLOT_"+port);
+        this.metricsExec = metricsExec;
 
         this.cachedCurrentAssignments = cachedCurrentAssignments;
         this.clusterState = clusterState;
@@ -1024,7 +1030,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
             port,
             iSupervisor,
             localState,
-            this);
+            this,
+            metricsExec);
         this.newAssignment.set(dynamicState.newAssignment);
         if (MachineState.RUNNING == dynamicState.state) {
             //We are running so we should recover the blobs.

http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index ee5a55c..147a8aa 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -27,6 +27,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.FileUtils;
@@ -74,6 +76,10 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
     private final AtomicReference<Map<Long, LocalAssignment>> currAssignment;
     private final StormTimer heartbeatTimer;
     private final StormTimer eventTimer;
+    //Right now this is only used for sending metrics to nimbus,
+    // but we may want to combine it with the heartbeatTimer at some point
+    // to really make this work well.
+    private final ExecutorService heartbeatExecutor;
     private final AsyncLocalizer asyncLocalizer;
     private EventManager eventManager;
     private ReadClusterState readState;
@@ -89,6 +95,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
         this.upTime = Utils.makeUptimeComputer();
         this.stormVersion = VersionInfo.getVersion();
         this.sharedContext = sharedContext;
+        this.heartbeatExecutor = Executors.newFixedThreadPool(1);
         
         iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf));
         
@@ -125,7 +132,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable {
 
         this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
     }
-    
+
+    /**
+     * Get the executor service that is supposed to be used for heart-beats.
+     */
+    public ExecutorService getHeartbeatExecutor() {
+        return heartbeatExecutor;
+    }
+
     public String getId() {
         return supervisorId;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/48c2fda8/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index f627bc7..08a5266 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.daemon.supervisor;
 
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.*;
@@ -150,7 +149,7 @@ public class SlotTest {
             ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000,
-                    containerLauncher, "localhost", 8080, iSuper, state, cb);
+                    containerLauncher, "localhost", 8080, iSuper, state, cb, null);
             DynamicState dynamicState = new DynamicState(null, null, null);
             DynamicState nextState = Slot.handleEmpty(dynamicState, staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
@@ -182,7 +181,7 @@ public class SlotTest {
             
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb);
+                    containerLauncher, "localhost", port, iSuper, state, cb, null);
             DynamicState dynamicState = new DynamicState(null, null, null)
                     .withNewAssignment(newAssignment);
 
@@ -251,7 +250,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb);
+                    containerLauncher, "localhost", port, iSuper, state, cb, null);
             DynamicState dynamicState = new DynamicState(assignment, container, assignment);
             
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -312,7 +311,7 @@ public class SlotTest {
             
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb);
+                    containerLauncher, "localhost", port, iSuper, state, cb, null);
             DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment);
             
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -393,7 +392,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb);
+                    containerLauncher, "localhost", port, iSuper, state, cb, null);
             DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null);
             
             DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState);
@@ -454,7 +453,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb);
+                    containerLauncher, "localhost", port, iSuper, state, cb, null);
             Set<TopoProfileAction> profileActions = new HashSet<>();
             ProfileRequest request = new ProfileRequest();
             request.set_action(ProfileAction.JPROFILE_STOP);
@@ -532,7 +531,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             long heartbeatTimeoutMs = 5000;
             StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000,
-                containerLauncher, "localhost", port, iSuper, state, cb);
+                containerLauncher, "localhost", port, iSuper, state, cb, null);
 
             Set<Slot.BlobChanging> changing = new HashSet<>();
             LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class);