You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/09/17 20:29:48 UTC

[3/7] storm git commit: STORM-3162: Cleanup heartbeats cache and make it thread safe

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 696cb2b..8f9f061 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -24,7 +24,7 @@
            [org.apache.storm.daemon.nimbus TopoCache Nimbus Nimbus$StandaloneINimbus]
            [org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
            [org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
-           [org.apache.storm.stats BoltExecutorStats StatsUtil]
+           [org.apache.storm.stats BoltExecutorStats StatsUtil ClientStatsUtil]
            [org.apache.storm.security.auth IGroupMappingServiceProvider IAuthorizer])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
   (:import [org.apache.storm.testing TmpPath])
@@ -166,11 +166,11 @@
                 (HashMap.))]
     (log-warn "curr-beat:" (prn-str curr-beat) ",stats:" (prn-str stats))
     (log-warn "stats type:" (type stats))
-    (.put stats (StatsUtil/convertExecutor executor) (.renderStats (BoltExecutorStats. 20 (*STORM-CONF* NUM-STAT-BUCKETS))))
+    (.put stats (ClientStatsUtil/convertExecutor executor) (.renderStats (BoltExecutorStats. 20 (*STORM-CONF* NUM-STAT-BUCKETS))))
     (log-warn "merged:" stats)
 
     (.workerHeartbeat state storm-id node port
-      (StatsUtil/thriftifyZkWorkerHb (StatsUtil/mkZkWorkerHb storm-id stats (int 10))))
+      (ClientStatsUtil/thriftifyZkWorkerHb (ClientStatsUtil/mkZkWorkerHb storm-id stats (int 10))))
     (.sendSupervisorWorkerHeartbeat (.getNimbus cluster) (StatsUtil/thriftifyRPCWorkerHb storm-id executor))))
 
 (defn slot-assignments [cluster storm-id]
@@ -1876,14 +1876,14 @@
 
 (deftest do-cleanup-removes-inactive-znodes
   (let [inactive-topos (list "topo2" "topo3")
-        hb-cache (into {}(map vector inactive-topos '(nil nil)))
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
         conf {NIMBUS-MONITOR-FREQ-SECS 10 NIMBUS-TOPOLOGY-BLOBSTORE-DELETION-DELAY-MS 0}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
                     (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil (StormMetricsRegistry.)))]
-        (.set (.getHeartbeatsCache nimbus) hb-cache)
+        (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo2")
+        (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo3")
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
 
           (.doCleanup nimbus)
@@ -1909,19 +1909,19 @@
           (.rmDependencyJarsInTopology (Mockito/verify nimbus) "topo3")
 
           ;; remove topos from heartbeat cache
-          (is (= (count (.get (.getHeartbeatsCache nimbus))) 0))))))
+          (is (= (.getNumToposCached (.getHeartbeatsCache nimbus)) 0))))))
 
 
 (deftest do-cleanup-does-not-teardown-active-topos
   (let [inactive-topos ()
-        hb-cache {"topo1" nil "topo2" nil}
         mock-state (mock-cluster-state)
         mock-blob-store (Mockito/mock BlobStore)
         conf {NIMBUS-MONITOR-FREQ-SECS 10}]
     (with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
                     (zkLeaderElectorImpl [conf zk blob-store tc cluster-state acls metrics-registry] (MockLeaderElector. ))))]
       (let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil (StormMetricsRegistry.)))]
-        (.set (.getHeartbeatsCache nimbus) hb-cache)
+        (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo1")
+        (.addEmptyTopoForTests (.getHeartbeatsCache nimbus) "topo2")
         (.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
 
           (.doCleanup nimbus)
@@ -1932,9 +1932,9 @@
           (.rmTopologyKeys (Mockito/verify nimbus (Mockito/times 0)) (Mockito/anyObject))
 
           ;; hb-cache goes down to 1 because only one topo was inactive
-          (is (= (count (.get (.getHeartbeatsCache nimbus))) 2))
-          (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo1"))
-          (is (contains? (.get (.getHeartbeatsCache nimbus)) "topo2"))))))
+          (is (= (.getNumToposCached (.getHeartbeatsCache nimbus)) 2))
+          (is (contains? (.getTopologyIds (.getHeartbeatsCache nimbus)) "topo1"))
+          (is (contains? (.getTopologyIds (.getHeartbeatsCache nimbus)) "topo2"))))))
 
 (deftest user-topologies-for-supervisor
   (let [assignment (doto (Assignment.)

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 43d2e19..b3d74b3 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -171,7 +171,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>780</maxAllowedViolations>
+                    <maxAllowedViolations>853</maxAllowedViolations>
                 </configuration>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
new file mode 100644
index 0000000..320f4fb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
+    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+    private static class ExecutorCache {
+        private Boolean isTimedOut;
+        private Integer nimbusTime;
+        private Integer executorReportedTime;
+
+        public ExecutorCache(Map<String, Object> newBeat) {
+            if (newBeat != null) {
+                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+            } else {
+                executorReportedTime = 0;
+            }
+
+            nimbusTime = Time.currentTimeSecs();
+            isTimedOut = false;
+        }
+
+        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
+            this.isTimedOut = isTimedOut;
+            this.nimbusTime = nimbusTime;
+            this.executorReportedTime = executorReportedTime;
+        }
+
+        public synchronized Boolean isTimedOut() {
+            return isTimedOut;
+        }
+
+        public synchronized Integer getNimbusTime() {
+            return nimbusTime;
+        }
+
+        public synchronized Integer getExecutorReportedTime() {
+            return executorReportedTime;
+        }
+
+        public synchronized void updateTimeout(Integer timeout) {
+            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+        }
+
+        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
+            if (newBeat != null) {
+                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+                if (!newReportedTime.equals(executorReportedTime)) {
+                    nimbusTime = Time.currentTimeSecs();
+                }
+                executorReportedTime = newReportedTime;
+            }
+            updateTimeout(timeout);
+        }
+    }
+
+    //Topology Id -> executor ids -> component -> stats(...)
+    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+    /**
+     * Create an empty cache.
+     */
+    public HeartbeatCache() {
+        this.cache = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Add an empty topology to the cache for testing purposes.
+     * @param topoId the id of the topology to add.
+     */
+    @VisibleForTesting
+    public void addEmptyTopoForTests(String topoId) {
+        cache.put(topoId, new ConcurrentHashMap<>());
+    }
+
+    /**
+     * Get the number of topologies with cached heartbeats.
+     * @return the number of topologies with cached heartbeats.
+     */
+    @VisibleForTesting
+    public int getNumToposCached() {
+        return cache.size();
+    }
+
+    /**
+     * Get the topology ids with cached heartbeats.
+     * @return the set of topology ids with cached heartbeats.
+     */
+    @VisibleForTesting
+    public Set<String> getTopologyIds() {
+        return cache.keySet();
+    }
+
+    /**
+     * Remove a specific topology from the cache.
+     * @param topoId the id of the topology to remove.
+     */
+    public void removeTopo(String topoId) {
+        cache.remove(topoId);
+    }
+
+    /**
+     * Update the heartbeats for a topology with no heartbeats that came in.
+     * @param topoId the id of the topology to look at.
+     * @param taskTimeoutSecs the timeout to know if they are too old.
+     */
+    public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
+        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+        //if not executor beats, refresh is-timed-out of the cache which is done by master
+        for (ExecutorCache ec : topoCache.values()) {
+            ec.updateTimeout(taskTimeoutSecs);
+        }
+    }
+
+    /**
+     * Update the cache with heartbeats from a worker through zookeeper.
+     * @param topoId the id to the topology.
+     * @param executorBeats the HB data.
+     * @param allExecutors the executors.
+     * @param timeout the timeout.
+     */
+    public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, Map<String,Object>> executorBeats,
+                                      Set<List<Integer>> allExecutors, Integer timeout) {
+        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+        if (executorBeats == null) {
+            executorBeats = new HashMap<>();
+        }
+
+        for (List<Integer> executor : allExecutors) {
+            final Map<String, Object> newBeat = executorBeats.get(executor);
+            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+            currBeat.updateFromHb(timeout, newBeat);
+        }
+    }
+
+    /**
+     * Update the heartbeats for a given worker.
+     * @param workerHeartbeat the heartbeats from the worker.
+     * @param taskTimeoutSecs the timeout we should be looking at.
+     */
+    public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, Integer taskTimeoutSecs) {
+        Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
+        String topoId = workerHeartbeat.get_storm_id();
+        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+
+        for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
+            List<Integer> executor = Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
+            final Map<String, Object> newBeat = executorBeats.get(executor);
+            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
+            currBeat.updateFromHb(taskTimeoutSecs, newBeat);
+        }
+    }
+
+    /**
+     * Get all of the alive executors for a given topology.
+     * @param topoId the id of the topology we are looking for.
+     * @param allExecutors all of the executors for this topology.
+     * @param assignment the current topology assignment.
+     * @param taskLaunchSecs timeout for right after a worker is launched.
+     * @return the set of tasks that are alive.
+     */
+    public Set<List<Integer>> getAliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
+        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
+        LOG.debug("Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
+            topoId, allExecutors, assignment, topoCache);
+
+        Set<List<Integer>> ret = new HashSet<>();
+        Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
+
+        for (List<Integer> exec : allExecutors) {
+            List<Long> longExec = new ArrayList<>(exec.size());
+            for (Integer num : exec) {
+                longExec.add(num.longValue());
+            }
+
+            Long startTime = execToStartTimes.get(longExec);
+            ExecutorCache executorCache = topoCache.get(exec);
+            //null isTimedOut means worker never reported any heartbeat
+            Boolean isTimedOut = executorCache == null ? null : executorCache.isTimedOut();
+            Integer delta = startTime == null ? null : Time.deltaSecs(startTime.intValue());
+            if (startTime != null && ((delta < taskLaunchSecs) || (isTimedOut != null && !isTimedOut))) {
+                ret.add(exec);
+            } else {
+                LOG.info("Executor {}:{} not alive", topoId, exec);
+            }
+        }
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 5c48271..a33a4d0 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -190,6 +190,7 @@ import org.apache.storm.shade.com.google.common.collect.Maps;
 import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.shade.org.apache.zookeeper.ZooDefs;
 import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.stats.ClientStatsUtil;
 import org.apache.storm.stats.StatsUtil;
 import org.apache.storm.thrift.TException;
 import org.apache.storm.utils.BufferInputStream;
@@ -297,7 +298,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         Assignment oldAssignment = state.assignmentInfo(topoId, null);
         state.removeStorm(topoId);
         notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
-        nimbus.getHeartbeatsCache().getAndUpdate(new Dissoc<>(topoId));
+        nimbus.heartbeatsCache.removeTopo(topoId);
         nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
         return null;
     };
@@ -404,7 +405,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private final Object submitLock = new Object();
     private final Object schedLock = new Object();
     private final Object credUpdateLock = new Object();
-    private final AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache;
+    private final HeartbeatCache heartbeatsCache;
     private final AtomicBoolean heartbeatsReadyFlag;
     private final IWorkerHeartbeatsRecoveryStrategy heartbeatsRecoveryStrategy;
     @SuppressWarnings("deprecation")
@@ -541,7 +542,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             stormClusterState = makeStormClusterState(conf);
         }
         this.stormClusterState = stormClusterState;
-        this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
+        this.heartbeatsCache = new HeartbeatCache();
         this.heartbeatsReadyFlag = new AtomicBoolean(false);
         this.heartbeatsRecoveryStrategy = WorkerHeartbeatsRecoveryStrategyFactory.getStrategy(conf);
         this.downloaders = fileCacheMap(conf);
@@ -1464,7 +1465,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     @VisibleForTesting
-    public AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> getHeartbeatsCache() {
+    public HeartbeatCache getHeartbeatsCache() {
         return heartbeatsCache;
     }
 
@@ -1719,23 +1720,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         IStormClusterState state = stormClusterState;
         Map<List<Integer>, Map<String, Object>> executorBeats =
             StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port()));
-        Map<List<Integer>, Map<String, Object>> cache = StatsUtil.updateHeartbeatCacheFromZkHeartbeat(heartbeatsCache.get().get(topoId),
-                                                                                                      executorBeats, allExecutors,
-                                                                                                      ObjectReader.getInt(conf.get(
-                                                                                                          DaemonConfig
-                                                                                                              .NIMBUS_TASK_TIMEOUT_SECS)));
-        heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
-    }
-
-    private void updateHeartbeats(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) {
-        LOG.debug("Updating heartbeats for {} {}", topoId, allExecutors);
-        Map<List<Integer>, Map<String, Object>> cache = heartbeatsCache.get().get(topoId);
-        if (cache == null) {
-            cache = new HashMap<>();
-            heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
-        }
-        StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId),
-                                       null, allExecutors, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+        heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors,
+            ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
     }
 
     /**
@@ -1751,27 +1737,14 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (zkHeartbeatTopologies.contains(topoId)) {
                 updateHeartbeatsFromZkHeartbeat(topoId, topologyToExecutors.get(topoId), entry.getValue());
             } else {
-                updateHeartbeats(topoId, topologyToExecutors.get(topoId), entry.getValue());
+                LOG.debug("Timing out old heartbeats for {}", topoId);
+                heartbeatsCache.timeoutOldHeartbeats(topoId, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
             }
         }
     }
 
     private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) {
-        Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
-        String topoId = workerHeartbeat.get_storm_id();
-        Map<List<Integer>, Map<String, Object>> cache = heartbeatsCache.get().get(topoId);
-        if (cache == null) {
-            cache = new HashMap<>();
-            heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
-        }
-        Set<List<Integer>> executors = new HashSet<>();
-        for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
-            executors.add(Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end()));
-        }
-
-        StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), executorBeats, executors,
-                                       ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
-
+        heartbeatsCache.updateHeartbeat(workerHeartbeat, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
     }
 
     private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats workerHeartbeats) {
@@ -1812,36 +1785,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     private Set<List<Integer>> aliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment) {
-        Map<List<Integer>, Map<String, Object>> hbCache = heartbeatsCache.get().get(topoId);
-        //in case that no workers report any heartbeats yet.
-        if (null == hbCache) {
-            hbCache = new HashMap<>();
-        }
-        LOG.debug("NEW  Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
-                  topoId, allExecutors, assignment, hbCache);
-
-        int taskLaunchSecs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS));
-        Set<List<Integer>> ret = new HashSet<>();
-        Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
-
-        for (List<Integer> exec : allExecutors) {
-            List<Long> longExec = new ArrayList<Long>(exec.size());
-            for (Integer num : exec) {
-                longExec.add(num.longValue());
-            }
-
-            Long startTime = execToStartTimes.get(longExec);
-            Map<String, Object> executorCache = hbCache.get(StatsUtil.convertExecutor(longExec));
-            //null isTimedOut means worker never reported any heartbeat
-            Boolean isTimedOut = executorCache == null ? null : (Boolean) executorCache.get("is-timed-out");
-            Integer delta = startTime == null ? null : Time.deltaSecs(startTime.intValue());
-            if (startTime != null && ((delta < taskLaunchSecs) || (isTimedOut != null && !isTimedOut))) {
-                ret.add(exec);
-            } else {
-                LOG.info("Executor {}:{} not alive", topoId, exec);
-            }
-        }
-        return ret;
+        return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment,
+            ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)));
     }
 
     private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
@@ -2591,7 +2536,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 rmDependencyJarsInTopology(topoId);
                 forceDeleteTopoDistDir(topoId);
                 rmTopologyKeys(topoId);
-                heartbeatsCache.getAndUpdate(new Dissoc<>(topoId));
+                heartbeatsCache.removeTopo(topoId);
                 idToExecutors.getAndUpdate(new Dissoc<>(topoId));
             }
         }
@@ -3956,7 +3901,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                     NodeInfo ni = entry.getValue();
                     ExecutorInfo execInfo = toExecInfo(entry.getKey());
                     Map<String, String> nodeToHost = common.assignment.get_node_host();
-                    Map<String, Object> heartbeat = common.beats.get(StatsUtil.convertExecutor(entry.getKey()));
+                    Map<String, Object> heartbeat = common.beats.get(ClientStatsUtil.convertExecutor(entry.getKey()));
                     if (heartbeat == null) {
                         heartbeat = Collections.emptyMap();
                     }
@@ -4675,7 +4620,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         return true;
     }
 
-    private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
+    static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
         private final K key;
         private final V value;
 
@@ -4694,7 +4639,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
 
     // Shutdownable methods
 
-    private static final class Dissoc<K, V> implements UnaryOperator<Map<K, V>> {
+    static final class Dissoc<K, V> implements UnaryOperator<Map<K, V>> {
         private final K key;
 
         public Dissoc(K key) {