You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2018/09/15 20:11:39 UTC
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
GitHub user revans2 opened a pull request:
https://github.com/apache/storm/pull/2836
STORM-3162: Cleanup heartbeats cache and make it thread safe
This is an alternative to #2800
@zd-project I had such trouble really understanding what was happening with the heartbeat cache, because it was spread throughout so many files that I couldn't really be sure that #2800 would fix the issues or not.
I decided that I needed to refactor the code to really understand how it worked, and provide a clean interface to the cache, which is what I did. Once I had that it became clear to me that #2800 would fix the immediate issue, but there were other races in the code, especially with the caches for the executors. This is probably not a big deal but I felt it would be best to just fix it too.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/revans2/incubator-storm STORM-3162
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2836.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2836
----
commit 168a2075363be53d131f4ded41d29332ddce0f7b
Author: Robert (Bobby) Evans <ev...@...>
Date: 2018-09-14T19:48:33Z
STORM-3162: Cleanup heartbeats cache and make it thread safe
----
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218142320
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
+ * @param topoId the id of the topology to look at.
+ * @param taskTimeoutSecs the timeout to know if they are too old.
+ */
+ public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ //if not executor beats, refresh is-timed-out of the cache which is done by master
+ for (ExecutorCache ec : topoCache.values()) {
+ ec.updateTimeout(taskTimeoutSecs);
+ }
+ }
+
+ /**
+ * Update the cache with heartbeats from a worker through zookeeper.
+ * @param topoId the id to the topology.
+ * @param executorBeats the HB data.
+ * @param allExecutors the executors.
+ * @param timeout the timeout.
+ */
+ public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, Map<String,Object>> executorBeats,
+ Set<List<Integer>> allExecutors, Integer timeout) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ if (executorBeats == null) {
+ executorBeats = new HashMap<>();
+ }
+
+ for (List<Integer> executor : allExecutors) {
+ final Map<String, Object> newBeat = executorBeats.get(executor);
+ ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+ currBeat.updateFromHb(timeout, newBeat);
+ }
+ }
+
+ /**
+ * Update the heartbeats for a given worker.
+ * @param workerHeartbeat the heartbeats from the worker.
+ * @param taskTimeoutSecs the timeout we should be looking at.
+ */
+ public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, Integer taskTimeoutSecs) {
+ Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
+ String topoId = workerHeartbeat.get_storm_id();
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+
+ for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
+ List<Integer> executor = Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
+ final Map<String, Object> newBeat = executorBeats.get(executor);
+ ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+ currBeat.updateFromHb(taskTimeoutSecs, newBeat);
+ }
+ }
+
+ /**
+ * Get all of the alive executors for a given topology.
+ * @param topoId the id of the topology we are looking for.
+ * @param allExecutors all of the executors for this topology.
+ * @param assignment the current topology assignment.
+ * @param taskLaunchSecs timeout for right after a worker is launched.
+ * @return the set of tasks that are alive.
+ */
+ public Set<List<Integer>> getAliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ LOG.debug("Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
+ topoId, allExecutors, assignment, topoCache);
+
+ Set<List<Integer>> ret = new HashSet<>();
+ Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
+
+ for (List<Integer> exec : allExecutors) {
+ List<Long> longExec = new ArrayList<>(exec.size());
+ for (Integer num : exec) {
+ longExec.add(num.longValue());
+ }
+
+ Long startTime = execToStartTimes.get(longExec);
+ ExecutorCache executorCache = topoCache.get(exec);
+ //null isTimedOut means worker never reported any heartbeat
+ Boolean isTimedOut = executorCache == null ? null : executorCache.isTimedOut();
--- End diff --
Nit: Would probably be simpler to set isTimedOut to false if executorCache is null. That avoids the extra null check below.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218121407
--- Diff: storm-server/pom.xml ---
@@ -171,7 +171,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>780</maxAllowedViolations>
+ <maxAllowedViolations>853</maxAllowedViolations>
--- End diff --
This number seems to be moving in the wrong direction :)
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218177587
--- Diff: storm-server/pom.xml ---
@@ -171,7 +171,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>780</maxAllowedViolations>
+ <maxAllowedViolations>853</maxAllowedViolations>
--- End diff --
No, it's fine.
---
[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2836
Looks good, thanks. +1
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218173878
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
--- End diff --
Yes it is very confusing. I'll fix it.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218086663
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+ public static final String SPOUT = "spout";
+ public static final String BOLT = "bolt";
+ static final String EXECUTOR_STATS = "executor-stats";
+ static final String UPTIME = "uptime";
+ public static final String TIME_SECS = "time-secs";
+ public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
+ public static final IdentityTransformer IDENTITY = new IdentityTransformer();
+
+ /**
+ * Convert a List<Long> executor to java List<Integer>.
+ */
+ public static List<Integer> convertExecutor(List<Long> executor) {
+ return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
+ }
+
+ /**
+ * Make and map of executors to empty stats.
+ * @param executors the executors as keys of the map.
+ * @return and empty map of executors to stats.
+ */
+ public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Object executor : executors) {
+ List startEnd = (List) executor;
+ ret.put(convertExecutor(startEnd), null);
+ }
+ return ret;
+ }
+
+ /**
+ * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
+ */
+ public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
+ ret.put(convertExecutor(entry.getKey()), entry.getValue());
+ }
+ return ret;
+ }
+
+ /**
+ * Create a new worker heartbeat for zookeeper.
+ * @param topoId the topology id
+ * @param executorStats the stats for the executors
+ * @param uptime the uptime for the worker.
+ * @return the heartbeat map.
+ */
+ public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("storm-id", topoId);
+ ret.put(EXECUTOR_STATS, executorStats);
+ ret.put(UPTIME, uptime);
+ ret.put(TIME_SECS, Time.currentTimeSecs());
+
+ return ret;
+ }
+
+ private static Number getByKeyOr0(Map<String, Object> m, String k) {
+ if (m == null) {
+ return 0;
+ }
+
+ Number n = (Number) m.get(k);
+ if (n == null) {
+ return 0;
+ }
+ return n;
+ }
+
+ /**
+ * Get a sub-map by a given key.
+ * @param map the original map
+ * @param key the key to get it from.
+ * @return the map stored under key.
+ */
+ public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
+ if (map == null) {
+ return null;
+ }
+ return (Map<K, V>) map.get(key);
+ }
+
+ public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
+ ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
+ ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
+ ret.set_storm_id((String) heartbeat.get("storm-id"));
+ ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
+
+ Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
+
+ Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
+ if (executorStats != null) {
+ for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
+ List<Integer> executor = entry.getKey();
+ ExecutorStats stats = entry.getValue();
+ if (null != stats) {
+ convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
+ }
+ }
+ }
+ ret.set_executor_stats(convertedStats);
+
+ return ret;
+ }
+
+ /**
+ * Converts stats to be over given windows of time.
+ * @param stats the stats
+ * @param secKeyFunc transform the sub-key
+ * @param firstKeyFunc transform the main key
+ */
+ public static <K1, K2> Map windowSetConverter(
+ Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
+ Map ret = new HashMap();
+
+ for (Object o : stats.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ K1 key1 = firstKeyFunc.transform(entry.getKey());
+
+ Map subRetMap = (Map) ret.get(key1);
+ if (subRetMap == null) {
+ subRetMap = new HashMap();
+ }
+ ret.put(key1, subRetMap);
+
+ Map value = (Map) entry.getValue();
+ for (Object oo : value.entrySet()) {
+ Map.Entry subEntry = (Map.Entry) oo;
+ K2 key2 = secKeyFunc.transform(subEntry.getKey());
+ subRetMap.put(key2, subEntry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ // =====================================================================================
+ // key transformers
+ // =====================================================================================
+
+ /**
+ * Provides a way to transform one key into another.
+ * @param <T>
+ */
+ interface KeyTransformer<T> {
+ T transform(Object key);
+ }
+
+ static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
+ @Override
+ public GlobalStreamId transform(Object key) {
+ if (key instanceof List) {
+ List l = (List) key;
+ if (l.size() > 1) {
+ return new GlobalStreamId((String) l.get(0), (String) l.get(1));
+ }
+ }
+ return new GlobalStreamId("", key.toString());
--- End diff --
Should this be marked to _DEFAULT_STREAM_ID_ or is this expected to be ignored?
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218174140
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException,
return true;
}
- private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
+ static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
--- End diff --
Yes, I'll make them private again. This was because as I was refactoring I kept the Assoc and Dissoc until the very end when I fixed the threading.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218088968
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+ public static final String SPOUT = "spout";
+ public static final String BOLT = "bolt";
+ static final String EXECUTOR_STATS = "executor-stats";
+ static final String UPTIME = "uptime";
+ public static final String TIME_SECS = "time-secs";
+ public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
+ public static final IdentityTransformer IDENTITY = new IdentityTransformer();
+
+ /**
+ * Convert a List<Long> executor to java List<Integer>.
+ */
+ public static List<Integer> convertExecutor(List<Long> executor) {
+ return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
+ }
+
+ /**
+ * Make and map of executors to empty stats.
+ * @param executors the executors as keys of the map.
+ * @return and empty map of executors to stats.
+ */
+ public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Object executor : executors) {
+ List startEnd = (List) executor;
+ ret.put(convertExecutor(startEnd), null);
+ }
+ return ret;
+ }
+
+ /**
+ * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
+ */
+ public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
+ ret.put(convertExecutor(entry.getKey()), entry.getValue());
+ }
+ return ret;
+ }
+
+ /**
+ * Create a new worker heartbeat for zookeeper.
+ * @param topoId the topology id
+ * @param executorStats the stats for the executors
+ * @param uptime the uptime for the worker.
+ * @return the heartbeat map.
+ */
+ public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("storm-id", topoId);
+ ret.put(EXECUTOR_STATS, executorStats);
+ ret.put(UPTIME, uptime);
+ ret.put(TIME_SECS, Time.currentTimeSecs());
+
+ return ret;
+ }
+
+ private static Number getByKeyOr0(Map<String, Object> m, String k) {
+ if (m == null) {
+ return 0;
+ }
+
+ Number n = (Number) m.get(k);
+ if (n == null) {
+ return 0;
+ }
+ return n;
+ }
+
+ /**
+ * Get a sub-map by a given key.
+ * @param map the original map
+ * @param key the key to get it from.
+ * @return the map stored under key.
+ */
+ public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
+ if (map == null) {
+ return null;
+ }
+ return (Map<K, V>) map.get(key);
+ }
+
+ public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
+ ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
+ ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
+ ret.set_storm_id((String) heartbeat.get("storm-id"));
+ ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
+
+ Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
+
+ Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
+ if (executorStats != null) {
+ for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
+ List<Integer> executor = entry.getKey();
+ ExecutorStats stats = entry.getValue();
+ if (null != stats) {
+ convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
+ }
+ }
+ }
+ ret.set_executor_stats(convertedStats);
+
+ return ret;
+ }
+
+ /**
+ * Converts stats to be over given windows of time.
+ * @param stats the stats
+ * @param secKeyFunc transform the sub-key
+ * @param firstKeyFunc transform the main key
+ */
+ public static <K1, K2> Map windowSetConverter(
+ Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
+ Map ret = new HashMap();
+
+ for (Object o : stats.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ K1 key1 = firstKeyFunc.transform(entry.getKey());
+
+ Map subRetMap = (Map) ret.get(key1);
+ if (subRetMap == null) {
+ subRetMap = new HashMap();
+ }
+ ret.put(key1, subRetMap);
+
+ Map value = (Map) entry.getValue();
+ for (Object oo : value.entrySet()) {
+ Map.Entry subEntry = (Map.Entry) oo;
+ K2 key2 = secKeyFunc.transform(subEntry.getKey());
+ subRetMap.put(key2, subEntry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ // =====================================================================================
+ // key transformers
+ // =====================================================================================
+
+ /**
+ * Provides a way to transform one key into another.
+ * @param <T>
+ */
+ interface KeyTransformer<T> {
+ T transform(Object key);
+ }
+
+ static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
+ @Override
+ public GlobalStreamId transform(Object key) {
+ if (key instanceof List) {
+ List l = (List) key;
+ if (l.size() > 1) {
+ return new GlobalStreamId((String) l.get(0), (String) l.get(1));
+ }
+ }
+ return new GlobalStreamId("", key.toString());
--- End diff --
This code was just moved from one class StatsUtil.java to another ClientStatsUtil.java. If there is a bug here, it was there before, and we address it in a follow on JIRA.
https://github.com/apache/storm/blob/4605ae0a34858309171e726e1924b9a37695c977/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L2585
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/storm/pull/2836
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218124022
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
+ * @param topoId the id of the topology to look at.
+ * @param taskTimeoutSecs the timeout to know if they are too old.
+ */
+ public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ //if not executor beats, refresh is-timed-out of the cache which is done by master
--- End diff --
Does this comment still make sense?
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218173746
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
+ * @param topoId the id of the topology to look at.
+ * @param taskTimeoutSecs the timeout to know if they are too old.
+ */
+ public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ //if not executor beats, refresh is-timed-out of the cache which is done by master
--- End diff --
The comment came from the original code as I was doing refactoring. I'll make it better.
---
[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2836
@srdo I think I addressed all of your review comments.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218173536
--- Diff: storm-server/pom.xml ---
@@ -171,7 +171,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>780</maxAllowedViolations>
+ <maxAllowedViolations>853</maxAllowedViolations>
--- End diff --
Yes it is all the issues that came with StatsUtil, which is why the other one went in a good direction. I can try and clean them up more if you want.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218140867
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException,
return true;
}
- private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
+ static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
--- End diff --
Assoc and Dissoc don't seem to be used in new places, so is making them package private still necessary?
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r217911059
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+ public static final String SPOUT = "spout";
+ public static final String BOLT = "bolt";
+ static final String EXECUTOR_STATS = "executor-stats";
+ static final String UPTIME = "uptime";
+ public static final String TIME_SECS = "time-secs";
+ public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
+ public static final IdentityTransformer IDENTITY = new IdentityTransformer();
+
+ /**
+ * Convert a List<Long> executor to java List<Integer>.
+ */
+ public static List<Integer> convertExecutor(List<Long> executor) {
+ return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
+ }
+
+ /**
+ * Make and map of executors to empty stats.
--- End diff --
Nit: Couple of places saying "and" where it should say "a"/"an"
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218140315
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
--- End diff --
I don't understand what this is saying.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218174414
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
+ * @param topoId the id of the topology to look at.
+ * @param taskTimeoutSecs the timeout to know if they are too old.
+ */
+ public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ //if not executor beats, refresh is-timed-out of the cache which is done by master
+ for (ExecutorCache ec : topoCache.values()) {
+ ec.updateTimeout(taskTimeoutSecs);
+ }
+ }
+
+ /**
+ * Update the cache with heartbeats from a worker through zookeeper.
+ * @param topoId the id to the topology.
+ * @param executorBeats the HB data.
+ * @param allExecutors the executors.
+ * @param timeout the timeout.
+ */
+ public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, Map<String,Object>> executorBeats,
+ Set<List<Integer>> allExecutors, Integer timeout) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ if (executorBeats == null) {
+ executorBeats = new HashMap<>();
+ }
+
+ for (List<Integer> executor : allExecutors) {
+ final Map<String, Object> newBeat = executorBeats.get(executor);
+ ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+ currBeat.updateFromHb(timeout, newBeat);
+ }
+ }
+
+ /**
+ * Update the heartbeats for a given worker.
+ * @param workerHeartbeat the heartbeats from the worker.
+ * @param taskTimeoutSecs the timeout we should be looking at.
+ */
+ public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, Integer taskTimeoutSecs) {
+ Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
+ String topoId = workerHeartbeat.get_storm_id();
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+
+ for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
+ List<Integer> executor = Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
+ final Map<String, Object> newBeat = executorBeats.get(executor);
+ ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+ currBeat.updateFromHb(taskTimeoutSecs, newBeat);
+ }
+ }
+
+ /**
+ * Get all of the alive executors for a given topology.
+ * @param topoId the id of the topology we are looking for.
+ * @param allExecutors all of the executors for this topology.
+ * @param assignment the current topology assignment.
+ * @param taskLaunchSecs timeout for right after a worker is launched.
+ * @return the set of tasks that are alive.
+ */
+ public Set<List<Integer>> getAliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ LOG.debug("Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
+ topoId, allExecutors, assignment, topoCache);
+
+ Set<List<Integer>> ret = new HashSet<>();
+ Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
+
+ for (List<Integer> exec : allExecutors) {
+ List<Long> longExec = new ArrayList<>(exec.size());
+ for (Integer num : exec) {
+ longExec.add(num.longValue());
+ }
+
+ Long startTime = execToStartTimes.get(longExec);
+ ExecutorCache executorCache = topoCache.get(exec);
+ //null isTimedOut means worker never reported any heartbeat
+ Boolean isTimedOut = executorCache == null ? null : executorCache.isTimedOut();
--- End diff --
Yes I agree. Will do it. Again this was code that was already there, and I just refactored it, but I am happy to fix it.
---
[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r217911368
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
--- End diff --
Please rename to include the unit of time.
---