You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/17 20:29:50 UTC
[5/7] storm git commit: STORM-3162: Cleanup heartbeats cache and make
it thread safe
STORM-3162: Cleanup heartbeats cache and make it thread safe
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d3feb0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d3feb0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d3feb0e
Branch: refs/heads/master
Commit: 9d3feb0e18a96a90cdce9c2ff1727c9c9ecca4a1
Parents: eaed3cb
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 14 14:48:33 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 17 14:56:56 2018 -0500
----------------------------------------------------------------------
storm-client/pom.xml | 2 +-
.../org/apache/storm/daemon/worker/Worker.java | 10 +-
.../jvm/org/apache/storm/executor/Executor.java | 8 +-
.../storm/executor/bolt/BoltExecutor.java | 4 +-
.../storm/executor/spout/SpoutExecutor.java | 4 +-
.../apache/storm/stats/BoltExecutorStats.java | 10 +-
.../org/apache/storm/stats/ClientStatsUtil.java | 202 ++
.../jvm/org/apache/storm/stats/StatsUtil.java | 2610 ------------------
.../test/clj/org/apache/storm/nimbus_test.clj | 22 +-
storm-server/pom.xml | 2 +-
.../storm/daemon/nimbus/HeartbeatCache.java | 229 ++
.../org/apache/storm/daemon/nimbus/Nimbus.java | 87 +-
.../java/org/apache/storm/stats/StatsUtil.java | 2388 ++++++++++++++++
13 files changed, 2866 insertions(+), 2712 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/pom.xml
----------------------------------------------------------------------
diff --git a/storm-client/pom.xml b/storm-client/pom.xml
index 1a120d8..2e60d19 100644
--- a/storm-client/pom.xml
+++ b/storm-client/pom.xml
@@ -164,7 +164,7 @@
<!--Note - the version would be inherited-->
<configuration>
<excludes>**/generated/**</excludes>
- <maxAllowedViolations>3285</maxAllowedViolations>
+ <maxAllowedViolations>3067</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index d02de9b..048425e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -55,7 +55,7 @@ import org.apache.storm.shade.com.google.common.base.Preconditions;
import org.apache.storm.shade.org.apache.commons.io.FileUtils;
import org.apache.storm.shade.org.apache.commons.lang.ObjectUtils;
import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
-import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.NimbusClient;
@@ -346,17 +346,17 @@ public class Worker implements Shutdownable, DaemonCommon {
Map<List<Integer>, ExecutorStats> stats;
List<IRunningExecutor> executors = this.executorsAtom.get();
if (null == executors) {
- stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
+ stats = ClientStatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
} else {
- stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
+ stats = ClientStatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
.toMap(IRunningExecutor::getExecutorId,
IRunningExecutor::renderStats)));
}
- Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
+ Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
try {
workerState.stormClusterState
.workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port,
- StatsUtil.thriftifyZkWorkerHb(zkHB));
+ ClientStatsUtil.thriftifyZkWorkerHb(zkHB));
} catch (Exception ex) {
LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index e3de2e1..2352eec 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -60,8 +60,8 @@ import org.apache.storm.shade.com.google.common.collect.Lists;
import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
import org.apache.storm.shade.org.json.simple.JSONValue;
import org.apache.storm.shade.org.json.simple.parser.ParseException;
+import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.CommonStats;
-import org.apache.storm.stats.StatsUtil;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
@@ -177,7 +177,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
String componentId = workerTopologyContext.getComponentId(taskIds.get(0));
String type = getExecutorType(workerTopologyContext, componentId);
- if (StatsUtil.SPOUT.equals(type)) {
+ if (ClientStatsUtil.SPOUT.equals(type)) {
executor = new SpoutExecutor(workerState, executorId, credentials);
} else {
executor = new BoltExecutor(workerState, executorId, credentials);
@@ -205,9 +205,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
Map<String, SpoutSpec> spouts = topology.get_spouts();
Map<String, Bolt> bolts = topology.get_bolts();
if (spouts.containsKey(componentId)) {
- return StatsUtil.SPOUT;
+ return ClientStatsUtil.SPOUT;
} else if (bolts.containsKey(componentId)) {
- return StatsUtil.BOLT;
+ return ClientStatsUtil.BOLT;
} else {
throw new RuntimeException("Could not find " + componentId + " in " + topology);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 1008eba..48d39c6 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -38,7 +38,7 @@ import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.stats.BoltExecutorStats;
-import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -68,7 +68,7 @@ public class BoltExecutor extends Executor {
private BoltOutputCollectorImpl outputCollector;
public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
- super(workerData, executorId, credentials, StatsUtil.BOLT);
+ super(workerData, executorId, credentials, ClientStatsUtil.BOLT);
this.executeSampler = ConfigUtils.mkStatsSampler(topoConf);
this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID);
if (isSystemBoltExecutor) {
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index cd4bb09..c46a7b9 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -37,8 +37,8 @@ import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.SpoutExecutorStats;
-import org.apache.storm.stats.StatsUtil;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.utils.ConfigUtils;
@@ -73,7 +73,7 @@ public class SpoutExecutor extends Executor {
private long threadId = 0;
public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) {
- super(workerData, executorId, credentials, StatsUtil.SPOUT);
+ super(workerData, executorId, credentials, ClientStatsUtil.SPOUT);
this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT);
this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY));
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 6fea28a..26b2d74 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -83,11 +83,11 @@ public class BoltExecutorStats extends CommonStats {
// bolt stats
BoltStats boltStats = new BoltStats(
- StatsUtil.windowSetConverter(valueStat(getAcked()), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
- StatsUtil.windowSetConverter(valueStat(getFailed()), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
- StatsUtil.windowSetConverter(valueStat(processLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
- StatsUtil.windowSetConverter(valueStat(executedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY),
- StatsUtil.windowSetConverter(valueStat(executeLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY));
+ ClientStatsUtil.windowSetConverter(valueStat(getAcked()), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+ ClientStatsUtil.windowSetConverter(valueStat(getFailed()), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+ ClientStatsUtil.windowSetConverter(valueStat(processLatencyStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+ ClientStatsUtil.windowSetConverter(valueStat(executedStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY),
+ ClientStatsUtil.windowSetConverter(valueStat(executeLatencyStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY));
ret.set_specific(ExecutorSpecificStats.bolt(boltStats));
return ret;
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
new file mode 100644
index 0000000..3f63db3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+ public static final String SPOUT = "spout";
+ public static final String BOLT = "bolt";
+ static final String EXECUTOR_STATS = "executor-stats";
+ static final String UPTIME = "uptime";
+ public static final String TIME_SECS = "time-secs";
+ public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
+ public static final IdentityTransformer IDENTITY = new IdentityTransformer();
+
+ /**
+ * Convert a List<Long> executor to java List<Integer>.
+ */
+ public static List<Integer> convertExecutor(List<Long> executor) {
+ return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
+ }
+
+ /**
+ * Make and map of executors to empty stats.
+ * @param executors the executors as keys of the map.
+ * @return and empty map of executors to stats.
+ */
+ public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Object executor : executors) {
+ List startEnd = (List) executor;
+ ret.put(convertExecutor(startEnd), null);
+ }
+ return ret;
+ }
+
+ /**
+ * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
+ */
+ public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
+ ret.put(convertExecutor(entry.getKey()), entry.getValue());
+ }
+ return ret;
+ }
+
+ /**
+ * Create a new worker heartbeat for zookeeper.
+ * @param topoId the topology id
+ * @param executorStats the stats for the executors
+ * @param uptime the uptime for the worker.
+ * @return the heartbeat map.
+ */
+ public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("storm-id", topoId);
+ ret.put(EXECUTOR_STATS, executorStats);
+ ret.put(UPTIME, uptime);
+ ret.put(TIME_SECS, Time.currentTimeSecs());
+
+ return ret;
+ }
+
+ private static Number getByKeyOr0(Map<String, Object> m, String k) {
+ if (m == null) {
+ return 0;
+ }
+
+ Number n = (Number) m.get(k);
+ if (n == null) {
+ return 0;
+ }
+ return n;
+ }
+
+ /**
+ * Get a sub-map by a given key.
+ * @param map the original map
+ * @param key the key to get it from.
+ * @return the map stored under key.
+ */
+ public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
+ if (map == null) {
+ return null;
+ }
+ return (Map<K, V>) map.get(key);
+ }
+
+ public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
+ ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
+ ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
+ ret.set_storm_id((String) heartbeat.get("storm-id"));
+ ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
+
+ Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
+
+ Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
+ if (executorStats != null) {
+ for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
+ List<Integer> executor = entry.getKey();
+ ExecutorStats stats = entry.getValue();
+ if (null != stats) {
+ convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
+ }
+ }
+ }
+ ret.set_executor_stats(convertedStats);
+
+ return ret;
+ }
+
+ /**
+ * Converts stats to be over given windows of time.
+ * @param stats the stats
+ * @param secKeyFunc transform the sub-key
+ * @param firstKeyFunc transform the main key
+ */
+ public static <K1, K2> Map windowSetConverter(
+ Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
+ Map ret = new HashMap();
+
+ for (Object o : stats.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ K1 key1 = firstKeyFunc.transform(entry.getKey());
+
+ Map subRetMap = (Map) ret.get(key1);
+ if (subRetMap == null) {
+ subRetMap = new HashMap();
+ }
+ ret.put(key1, subRetMap);
+
+ Map value = (Map) entry.getValue();
+ for (Object oo : value.entrySet()) {
+ Map.Entry subEntry = (Map.Entry) oo;
+ K2 key2 = secKeyFunc.transform(subEntry.getKey());
+ subRetMap.put(key2, subEntry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ // =====================================================================================
+ // key transformers
+ // =====================================================================================
+
+ /**
+ * Provides a way to transform one key into another.
+ * @param <T>
+ */
+ interface KeyTransformer<T> {
+ T transform(Object key);
+ }
+
+ static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
+ @Override
+ public GlobalStreamId transform(Object key) {
+ if (key instanceof List) {
+ List l = (List) key;
+ if (l.size() > 1) {
+ return new GlobalStreamId((String) l.get(0), (String) l.get(1));
+ }
+ }
+ return new GlobalStreamId("", key.toString());
+ }
+ }
+
+ static class IdentityTransformer implements KeyTransformer<Object> {
+ @Override
+ public Object transform(Object key) {
+ return key;
+ }
+ }
+}