You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/17 20:29:50 UTC

[5/7] storm git commit: STORM-3162: Cleanup heartbeats cache and make it thread safe

STORM-3162: Cleanup heartbeats cache and make it thread safe


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d3feb0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d3feb0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d3feb0e

Branch: refs/heads/master
Commit: 9d3feb0e18a96a90cdce9c2ff1727c9c9ecca4a1
Parents: eaed3cb
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 14 14:48:33 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 17 14:56:56 2018 -0500

----------------------------------------------------------------------
 storm-client/pom.xml                            |    2 +-
 .../org/apache/storm/daemon/worker/Worker.java  |   10 +-
 .../jvm/org/apache/storm/executor/Executor.java |    8 +-
 .../storm/executor/bolt/BoltExecutor.java       |    4 +-
 .../storm/executor/spout/SpoutExecutor.java     |    4 +-
 .../apache/storm/stats/BoltExecutorStats.java   |   10 +-
 .../org/apache/storm/stats/ClientStatsUtil.java |  202 ++
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 2610 ------------------
 .../test/clj/org/apache/storm/nimbus_test.clj   |   22 +-
 storm-server/pom.xml                            |    2 +-
 .../storm/daemon/nimbus/HeartbeatCache.java     |  229 ++
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   87 +-
 .../java/org/apache/storm/stats/StatsUtil.java  | 2388 ++++++++++++++++
 13 files changed, 2866 insertions(+), 2712 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 1a120d8..2e60d19 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -164,7 +164,7 @@
                 <!--Note - the version would be inherited-->
                 <configuration>
                     <excludes>**/generated/**</excludes>
-                    <maxAllowedViolations>3285</maxAllowedViolations>
+                    <maxAllowedViolations>3067</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index d02de9b..048425e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -55,7 +55,7 @@ import org.apache.storm.shade.com.google.common.base.Preconditions;
 import org.apache.storm.shade.org.apache.commons.io.FileUtils;
 import org.apache.storm.shade.org.apache.commons.lang.ObjectUtils;
 import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
-import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.stats.ClientStatsUtil;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.NimbusClient;
@@ -346,17 +346,17 @@ public class Worker implements Shutdownable, DaemonCommon {
         Map<List<Integer>, ExecutorStats> stats;
         List<IRunningExecutor> executors = this.executorsAtom.get();
         if (null == executors) {
-            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
+            stats = ClientStatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
         } else {
-            stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
+            stats = ClientStatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
                                                                                   .toMap(IRunningExecutor::getExecutorId,
                                                                                          IRunningExecutor::renderStats)));
         }
-        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
+        Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
         try {
             workerState.stormClusterState
                 .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
-                                 StatsUtil.thriftifyZkWorkerHb(zkHB));
+                                 ClientStatsUtil.thriftifyZkWorkerHb(zkHB));
         } catch (Exception ex) {
             LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index e3de2e1..2352eec 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -60,8 +60,8 @@ import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
 import org.apache.storm.shade.org.json.simple.JSONValue;
 import org.apache.storm.shade.org.json.simple.parser.ParseException;
+import org.apache.storm.stats.ClientStatsUtil;
 import org.apache.storm.stats.CommonStats;
-import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
@@ -177,7 +177,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         String componentId = workerTopologyContext.getComponentId(taskIds.get(0));
 
         String type = getExecutorType(workerTopologyContext, componentId);
-        if (StatsUtil.SPOUT.equals(type)) {
+        if (ClientStatsUtil.SPOUT.equals(type)) {
             executor = new SpoutExecutor(workerState, executorId, credentials);
         } else {
             executor = new BoltExecutor(workerState, executorId, credentials);
@@ -205,9 +205,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         Map<String, SpoutSpec> spouts = topology.get_spouts();
         Map<String, Bolt> bolts = topology.get_bolts();
         if (spouts.containsKey(componentId)) {
-            return StatsUtil.SPOUT;
+            return ClientStatsUtil.SPOUT;
         } else if (bolts.containsKey(componentId)) {
-            return StatsUtil.BOLT;
+            return ClientStatsUtil.BOLT;
         } else {
             throw new RuntimeException("Could not find " + componentId + " in " + topology);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 1008eba..48d39c6 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -38,7 +38,7 @@ import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.security.auth.IAutoCredentials;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.stats.BoltExecutorStats;
-import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.stats.ClientStatsUtil;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -68,7 +68,7 @@ public class BoltExecutor extends Executor {
     private BoltOutputCollectorImpl outputCollector;
 
     public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
-        super(workerData, executorId, credentials, StatsUtil.BOLT);
+        super(workerData, executorId, credentials, ClientStatsUtil.BOLT);
         this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
         this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
         if (isSystemBoltExecutor) {

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index cd4bb09..c46a7b9 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -37,8 +37,8 @@ import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.ClientStatsUtil;
 import org.apache.storm.stats.SpoutExecutorStats;
-import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.utils.ConfigUtils;
@@ -73,7 +73,7 @@ public class SpoutExecutor extends Executor {
     private long threadId = 0;
 
     public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
-        super(workerData, executorId, credentials, StatsUtil.SPOUT);
+        super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
         this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
         this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT);
         this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 6fea28a..26b2d74 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -83,11 +83,11 @@ public class BoltExecutorStats extends CommonStats {
 
         // bolt stats
         BoltStats boltStats = new BoltStats(
-            StatsUtil.windowSetConverter(valueStat(getAcked()), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-            StatsUtil.windowSetConverter(valueStat(getFailed()), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-            StatsUtil.windowSetConverter(valueStat(processLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-            StatsUtil.windowSetConverter(valueStat(executedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
-            StatsUtil.windowSetConverter(valueStat(executeLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY));
+            ClientStatsUtil.windowSetConverter(valueStat(getAcked()), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+            ClientStatsUtil.windowSetConverter(valueStat(getFailed()), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+            ClientStatsUtil.windowSetConverter(valueStat(processLatencyStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+            ClientStatsUtil.windowSetConverter(valueStat(executedStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+            ClientStatsUtil.windowSetConverter(valueStat(executeLatencyStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY));
         ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
 
         return ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
new file mode 100644
index 0000000..3f63db3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+    public static final String SPOUT = "spout";
+    public static final String BOLT = "bolt";
+    static final String EXECUTOR_STATS = "executor-stats";
+    static final String UPTIME = "uptime";
+    public static final String TIME_SECS = "time-secs";
+    public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
+    public static final IdentityTransformer IDENTITY = new IdentityTransformer();
+
+    /**
+     * Convert a List<Long> executor to java List<Integer>.
+     */
+    public static List<Integer> convertExecutor(List<Long> executor) {
+        return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
+    }
+
+    /**
+     * Make and map of executors to empty stats.
+     * @param executors the executors as keys of the map.
+     * @return and empty map of executors to stats.
+     */
+    public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
+        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+        for (Object executor : executors) {
+            List startEnd = (List) executor;
+            ret.put(convertExecutor(startEnd), null);
+        }
+        return ret;
+    }
+
+    /**
+     * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
+     */
+    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
+        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+        for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
+            ret.put(convertExecutor(entry.getKey()), entry.getValue());
+        }
+        return ret;
+    }
+
+    /**
+     * Create a new worker heartbeat for zookeeper.
+     * @param topoId the topology id
+     * @param executorStats the stats for the executors
+     * @param uptime the uptime for the worker.
+     * @return the heartbeat map.
+     */
+    public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
+        Map<String, Object> ret = new HashMap<>();
+        ret.put("storm-id", topoId);
+        ret.put(EXECUTOR_STATS, executorStats);
+        ret.put(UPTIME, uptime);
+        ret.put(TIME_SECS, Time.currentTimeSecs());
+
+        return ret;
+    }
+
+    private static Number getByKeyOr0(Map<String, Object> m, String k) {
+        if (m == null) {
+            return 0;
+        }
+
+        Number n = (Number) m.get(k);
+        if (n == null) {
+            return 0;
+        }
+        return n;
+    }
+
+    /**
+     * Get a sub-map by a given key.
+     * @param map the original map
+     * @param key the key to get it from.
+     * @return the map stored under key.
+     */
+    public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
+        if (map == null) {
+            return null;
+        }
+        return (Map<K, V>) map.get(key);
+    }
+
+    public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
+        ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
+        ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
+        ret.set_storm_id((String) heartbeat.get("storm-id"));
+        ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
+
+        Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
+
+        Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
+        if (executorStats != null) {
+            for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
+                List<Integer> executor = entry.getKey();
+                ExecutorStats stats = entry.getValue();
+                if (null != stats) {
+                    convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
+                }
+            }
+        }
+        ret.set_executor_stats(convertedStats);
+
+        return ret;
+    }
+
+    /**
+     * Converts stats to be over given windows of time.
+     * @param stats the stats
+     * @param secKeyFunc transform the sub-key
+     * @param firstKeyFunc transform the main key
+     */
+    public static <K1, K2> Map windowSetConverter(
+        Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
+        Map ret = new HashMap();
+
+        for (Object o : stats.entrySet()) {
+            Map.Entry entry = (Map.Entry) o;
+            K1 key1 = firstKeyFunc.transform(entry.getKey());
+
+            Map subRetMap = (Map) ret.get(key1);
+            if (subRetMap == null) {
+                subRetMap = new HashMap();
+            }
+            ret.put(key1, subRetMap);
+
+            Map value = (Map) entry.getValue();
+            for (Object oo : value.entrySet()) {
+                Map.Entry subEntry = (Map.Entry) oo;
+                K2 key2 = secKeyFunc.transform(subEntry.getKey());
+                subRetMap.put(key2, subEntry.getValue());
+            }
+        }
+        return ret;
+    }
+
+    // =====================================================================================
+    // key transformers
+    // =====================================================================================
+
+    /**
+     * Provides a way to transform one key into another.
+     * @param <T>
+     */
+    interface KeyTransformer<T> {
+        T transform(Object key);
+    }
+
+    static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
+        @Override
+        public GlobalStreamId transform(Object key) {
+            if (key instanceof List) {
+                List l = (List) key;
+                if (l.size() > 1) {
+                    return new GlobalStreamId((String) l.get(0), (String) l.get(1));
+                }
+            }
+            return new GlobalStreamId("", key.toString());
+        }
+    }
+
+    static class IdentityTransformer implements KeyTransformer<Object> {
+        @Override
+        public Object transform(Object key) {
+            return key;
+        }
+    }
+}