You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/17 20:29:48 UTC
[3/7] storm git commit: STORM-3162: Cleanup heartbeats cache and make
it thread safe
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 696cb2b..8f9f061 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -24,7 +24,7 @@
[org.apache.storm.daemon.nimbus TopoCache Nimbus Nimbus$StandaloneINimbus]
[org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
[org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
- [org.apache.storm.stats BoltExecutorStats StatsUtil]
+ [org.apache.storm.stats BoltExecutorStats StatsUtil ClientStatsUtil]
[org.apache.storm.security.auth IGroupMappingServiceProvider IAuthorizer])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper])
(:import [org.apache.storm.testing TmpPath])
@@ -166,11 +166,11 @@
(HashMap.))]
(log-warn "curr-beat:" (prn-str curr-beat) ",stats:" (prn-str stats))
(log-warn "stats type:" (type stats))
- (.put stats (StatsUtil/convertExecutor executor) (.renderStats (BoltExecutorStats. 20 (*STORM-CONF* NUM-STAT-BUCKETS))))
+ (.put stats (ClientStatsUtil/convertExecutor executor) (.renderStats (BoltExecutorStats. 20 (*STORM-CONF* NUM-STAT-BUCKETS))))
(log-warn "merged:" stats)
(.workerHeartbeat state storm-id node port
- (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))
+ (ClientStatsUtil/thriftifyZkWorkerHb (ClientStatsUtil/mkZkWorkerHb storm-id stats (int 10))))
(.sendSupervisorWorkerHeartbeat (.getNimbus cluster) (StatsUtil/thriftifyRPCWorkerHb storm-id executor))))
(defn slot-assignments [cluster storm-id]
@@ -1876,14 +1876,14 @@
(deftest do-cleanup-removes-inactive-znodes
(let [inactive-topos (list "topo2" "topo3")
- hb-cache (into {}(map vector inactive-topos '(nil nil)))
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
conf {NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil (StormMetricsRegistry.)))]
- (.set (.getHeartbeatsCache nimbus) hb-cache)
+ (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo2")
+ (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo3")
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
(.doCleanup nimbus)
@@ -1909,19 +1909,19 @@
(.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo3")
;; remove topos from heartbeat cache
- (is (= (count (.get (.getHeartbeatsCache nimbus))) 0))))))
+ (is (= (.getNumToposCached (.getHeartbeatsCache nimbus)) 0))))))
(deftest do-cleanup-does-not-teardown-active-topos
(let [inactive-topos ()
- hb-cache {"topo1" nil "topo2" nil}
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
conf {NIMBUS-MONITOR-FREQ-SECS 10}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
(zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil (StormMetricsRegistry.)))]
- (.set (.getHeartbeatsCache nimbus) hb-cache)
+ (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo1")
+ (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo2")
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
(.doCleanup nimbus)
@@ -1932,9 +1932,9 @@
(.rmTopologyKeys (Mockito/verify nimbus (Mockito/times 0)) (Mockito/anyObject))
;; hb-cache goes down to 1 because only one topo was inactive
- (is (= (count (.get (.getHeartbeatsCache nimbus))) 2))
- (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo1"))
- (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo2"))))))
+ (is (= (.getNumToposCached (.getHeartbeatsCache nimbus)) 2))
+ (is (contains? (.getTopologyIds (.getHeartbeatsCache nimbus)) "topo1"))
+ (is (contains? (.getTopologyIds (.getHeartbeatsCache nimbus)) "topo2"))))))
(deftest user-topologies-for-supervisor
(let [assignment (doto (Assignment.)
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 43d2e19..b3d74b3 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -171,7 +171,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>780</maxAllowedViolations>
+ <maxAllowedViolations>853</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
new file mode 100644
index 0000000..320f4fb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came in.
+ * @param topoId the id of the topology to look at.
+ * @param taskTimeoutSecs the timeout to know if they are too old.
+ */
+ public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ //if not executor beats, refresh is-timed-out of the cache which is done by master
+ for (ExecutorCache ec : topoCache.values()) {
+ ec.updateTimeout(taskTimeoutSecs);
+ }
+ }
+
+ /**
+ * Update the cache with heartbeats from a worker through zookeeper.
+ * @param topoId the id to the topology.
+ * @param executorBeats the HB data.
+ * @param allExecutors the executors.
+ * @param timeout the timeout.
+ */
+ public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, Map<String,Object>> executorBeats,
+ Set<List<Integer>> allExecutors, Integer timeout) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ if (executorBeats == null) {
+ executorBeats = new HashMap<>();
+ }
+
+ for (List<Integer> executor : allExecutors) {
+ final Map<String, Object> newBeat = executorBeats.get(executor);
+ ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+ currBeat.updateFromHb(timeout, newBeat);
+ }
+ }
+
+ /**
+ * Update the heartbeats for a given worker.
+ * @param workerHeartbeat the heartbeats from the worker.
+ * @param taskTimeoutSecs the timeout we should be looking at.
+ */
+ public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, Integer taskTimeoutSecs) {
+ Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
+ String topoId = workerHeartbeat.get_storm_id();
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+
+ for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
+ List<Integer> executor = Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
+ final Map<String, Object> newBeat = executorBeats.get(executor);
+ ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+ currBeat.updateFromHb(taskTimeoutSecs, newBeat);
+ }
+ }
+
+ /**
+ * Get all of the alive executors for a given topology.
+ * @param topoId the id of the topology we are looking for.
+ * @param allExecutors all of the executors for this topology.
+ * @param assignment the current topology assignment.
+ * @param taskLaunchSecs timeout for right after a worker is launched.
+ * @return the set of tasks that are alive.
+ */
+ public Set<List<Integer>> getAliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
+ Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+ LOG.debug("Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
+ topoId, allExecutors, assignment, topoCache);
+
+ Set<List<Integer>> ret = new HashSet<>();
+ Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
+
+ for (List<Integer> exec : allExecutors) {
+ List<Long> longExec = new ArrayList<>(exec.size());
+ for (Integer num : exec) {
+ longExec.add(num.longValue());
+ }
+
+ Long startTime = execToStartTimes.get(longExec);
+ ExecutorCache executorCache = topoCache.get(exec);
+ //null isTimedOut means worker never reported any heartbeat
+ Boolean isTimedOut = executorCache == null ? null : executorCache.isTimedOut();
+ Integer delta = startTime == null ? null : Time.deltaSecs(startTime.intValue());
+ if (startTime != null && ((delta < taskLaunchSecs) || (isTimedOut != null && !isTimedOut))) {
+ ret.add(exec);
+ } else {
+ LOG.info("Executor {}:{} not alive", topoId, exec);
+ }
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 5c48271..a33a4d0 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -190,6 +190,7 @@ import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.thrift.TException;
import org.apache.storm.utils.BufferInputStream;
@@ -297,7 +298,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Assignment oldAssignment = state.assignmentInfo(topoId, null);
state.removeStorm(topoId);
notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
- nimbus.getHeartbeatsCache().getAndUpdate(new Dissoc<>(topoId));
+ nimbus.heartbeatsCache.removeTopo(topoId);
nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
return null;
};
@@ -404,7 +405,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final Object submitLock = new Object();
private final Object schedLock = new Object();
private final Object credUpdateLock = new Object();
- private final AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache;
+ private final HeartbeatCache heartbeatsCache;
private final AtomicBoolean heartbeatsReadyFlag;
private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy;
@SuppressWarnings("deprecation")
@@ -541,7 +542,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
stormClusterState = makeStormClusterState(conf);
}
this.stormClusterState = stormClusterState;
- this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
+ this.heartbeatsCache = new HeartbeatCache();
this.heartbeatsReadyFlag = new AtomicBoolean(false);
this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf);
this.downloaders = fileCacheMap(conf);
@@ -1464,7 +1465,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
@VisibleForTesting
- public AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> getHeartbeatsCache() {
+ public HeartbeatCache getHeartbeatsCache() {
return heartbeatsCache;
}
@@ -1719,23 +1720,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = stormClusterState;
Map<List<Integer>, Map<String, Object>> executorBeats =
StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port()));
- Map<List<Integer>, Map<String, Object>> cache = StatsUtil.updateHeartbeatCacheFromZkHeartbeat(heartbeatsCache.get().get(topoId),
- executorBeats, allExecutors,
- ObjectReader.getInt(conf.get(
- DaemonConfig
- .NIMBUS_TASK_TIMEOUT_SECS)));
- heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
- }
-
- private void updateHeartbeats(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) {
- LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors);
- Map<List<Integer>, Map<String, Object>> cache = heartbeatsCache.get().get(topoId);
- if (cache == null) {
- cache = new HashMap<>();
- heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
- }
- StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId),
- null, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+ heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors,
+ ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
}
/**
@@ -1751,27 +1737,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (zkHeartbeatTopologies.contains(topoId)) {
updateHeartbeatsFromZkHeartbeat(topoId, topologyToExecutors.get(topoId), entry.getValue());
} else {
- updateHeartbeats(topoId, topologyToExecutors.get(topoId), entry.getValue());
+ LOG.debug("Timing out old heartbeats for {}", topoId);
+ heartbeatsCache.timeoutOldHeartbeats(topoId, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
}
}
}
private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) {
- Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
- String topoId = workerHeartbeat.get_storm_id();
- Map<List<Integer>, Map<String, Object>> cache = heartbeatsCache.get().get(topoId);
- if (cache == null) {
- cache = new HashMap<>();
- heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
- }
- Set<List<Integer>> executors = new HashSet<>();
- for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
- executors.add(Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end()));
- }
-
- StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), executorBeats, executors,
- ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
-
+ heartbeatsCache.updateHeartbeat(workerHeartbeat, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
}
private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats workerHeartbeats) {
@@ -1812,36 +1785,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private Set<List<Integer>> aliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment) {
- Map<List<Integer>, Map<String, Object>> hbCache = heartbeatsCache.get().get(topoId);
- //in case that no workers report any heartbeats yet.
- if (null == hbCache) {
- hbCache = new HashMap<>();
- }
- LOG.debug("NEW Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
- topoId, allExecutors, assignment, hbCache);
-
- int taskLaunchSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS));
- Set<List<Integer>> ret = new HashSet<>();
- Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
-
- for (List<Integer> exec : allExecutors) {
- List<Long> longExec = new ArrayList<Long>(exec.size());
- for (Integer num : exec) {
- longExec.add(num.longValue());
- }
-
- Long startTime = execToStartTimes.get(longExec);
- Map<String, Object> executorCache = hbCache.get(StatsUtil.convertExecutor(longExec));
- //null isTimedOut means worker never reported any heartbeat
- Boolean isTimedOut = executorCache == null ? null : (Boolean) executorCache.get("is-timed-out");
- Integer delta = startTime == null ? null : Time.deltaSecs(startTime.intValue());
- if (startTime != null && ((delta < taskLaunchSecs) || (isTimedOut != null && !isTimedOut))) {
- ret.add(exec);
- } else {
- LOG.info("Executor {}:{} not alive", topoId, exec);
- }
- }
- return ret;
+ return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment,
+ ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)));
}
private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
@@ -2591,7 +2536,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
rmDependencyJarsInTopology(topoId);
forceDeleteTopoDistDir(topoId);
rmTopologyKeys(topoId);
- heartbeatsCache.getAndUpdate(new Dissoc<>(topoId));
+ heartbeatsCache.removeTopo(topoId);
idToExecutors.getAndUpdate(new Dissoc<>(topoId));
}
}
@@ -3956,7 +3901,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
NodeInfo ni = entry.getValue();
ExecutorInfo execInfo = toExecInfo(entry.getKey());
Map<String, String> nodeToHost = common.assignment.get_node_host();
- Map<String, Object> heartbeat = common.beats.get(StatsUtil.convertExecutor(entry.getKey()));
+ Map<String, Object> heartbeat = common.beats.get(ClientStatsUtil.convertExecutor(entry.getKey()));
if (heartbeat == null) {
heartbeat = Collections.emptyMap();
}
@@ -4675,7 +4620,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return true;
}
- private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
+ static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
private final K key;
private final V value;
@@ -4694,7 +4639,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
// Shutdownable methods
- private static final class Dissoc<K, V> implements UnaryOperator<Map<K, V>> {
+ static final class Dissoc<K, V> implements UnaryOperator<Map<K, V>> {
private final K key;
public Dissoc(K key) {