You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/25 04:32:35 UTC

[storm] branch master updated: STORM-3479 HB timeout configurable on a topology level

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new e28401f  STORM-3479 HB timeout configurable on a topology level
     new 822a468  Merge pull request #3121 from dandsager1/STORM-3479
e28401f is described below

commit e28401fe1838a7e4d2edcd2b636b0ee6710b9435
Author: dandsager <da...@verizonmedia.com>
AuthorDate: Thu Sep 5 15:10:17 2019 -0500

    STORM-3479 HB timeout configurable on a topology level
---
 conf/defaults.yaml                                 |  2 +
 storm-client/src/jvm/org/apache/storm/Config.java  | 18 +++++++
 .../main/java/org/apache/storm/DaemonConfig.java   |  4 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java     | 62 ++++++++++++++++++----
 .../apache/storm/daemon/supervisor/Container.java  |  8 +++
 .../org/apache/storm/daemon/supervisor/Slot.java   | 37 +++++++++++--
 .../storm/daemon/supervisor/SupervisorUtils.java   |  8 ---
 .../storm/daemon/logviewer/utils/WorkerLogs.java   | 39 +++++++++++++-
 8 files changed, 152 insertions(+), 26 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e555ac8..3e9f8fc 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -171,6 +171,8 @@ supervisor.monitor.frequency.secs: 3
 supervisor.heartbeat.frequency.secs: 5
 #max timeout for a node worker heartbeats when master gains leadership
 supervisor.worker.heartbeats.max.timeout.secs: 600
+#For topology configurable heartbeat timeout, maximum allowed heartbeat timeout.
+worker.max.timeout.secs: 600
 supervisor.enable: true
 supervisor.supervisors: []
 supervisor.supervisors.commands: []
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index b46c112..a54faf6 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1042,12 +1042,30 @@ public class Config extends HashMap<String, Object> {
     public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
     /**
      * How long a worker can go without heartbeating before the supervisor tries to restart the worker process.
+     * Can be overridden by {@link #TOPOLOGY_WORKER_TIMEOUT_SECS}, if set.
      */
     @IsInteger
     @IsPositiveNumber
     @NotNull
     public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
     /**
+     * Enforce maximum on {@link #TOPOLOGY_WORKER_TIMEOUT_SECS}.
+     */
+    @IsInteger
+    @IsPositiveNumber
+    @NotNull
+    public static final String WORKER_MAX_TIMEOUT_SECS = "worker.max.timeout.secs";
+    /**
+     * Topology configurable worker heartbeat timeout before the supervisor tries to restart the worker process.
+     * Maximum value constrained by {@link #WORKER_MAX_TIMEOUT_SECS}.
+     * When topology timeout is greater, the following configs are effectively overridden:
+     * {@link #SUPERVISOR_WORKER_TIMEOUT_SECS}, SUPERVISOR_WORKER_START_TIMEOUT_SECS, NIMBUS_TASK_TIMEOUT_SECS and NIMBUS_TASK_LAUNCH_SECS.
+     */
+    @IsInteger
+    @IsPositiveNumber
+    @NotNull
+    public static final String TOPOLOGY_WORKER_TIMEOUT_SECS = "topology.worker.timeout.secs";
+    /**
      * How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill.
      * If a worker fails to shut down gracefully within this delay, it will either suicide or be forcibly killed by the supervisor.
      */
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index def49ae..28076f4 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -186,12 +186,12 @@ public class DaemonConfig implements Validated {
 
     /**
      * How long without heartbeating a task can go before nimbus will consider the task dead and reassign it to another location.
+     * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
      */
     @IsInteger
     @IsPositiveNumber
     public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
 
-
     /**
      * How often nimbus should wake up to check heartbeats and do reassignments. Note that if a machine ever goes down Nimbus will
      * immediately wake up and take action. This parameter is for checking for failures when there's no explicit event like that occurring.
@@ -234,6 +234,7 @@ public class DaemonConfig implements Validated {
      *
      * <p>A separate timeout exists for launch because there can be quite a bit of overhead
      * to launching new JVM's and configuring them.</p>
+     * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
      */
     @IsInteger
     @IsPositiveNumber
@@ -794,6 +795,7 @@ public class DaemonConfig implements Validated {
      * How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process.
      * This value override supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the
      * JVM on launch.
+     * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
      */
     @IsInteger
     @IsPositiveNumber
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 45add73..69b2bb1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1113,6 +1113,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
         ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
         ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+
+        if (mergedConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
+            int workerTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
+            int workerMaxTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.WORKER_MAX_TIMEOUT_SECS));
+            if (workerTimeoutSecs > workerMaxTimeoutSecs) {
+                ret.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, workerMaxTimeoutSecs);
+                String topoId = (String) mergedConf.get(Config.STORM_ID);
+                LOG.warn("Topology {} topology.worker.timeout.secs is too large. Reducing from {} to {}",
+                    topoId, workerTimeoutSecs, workerMaxTimeoutSecs);
+            }
+        }
         return ret;
     }
 
@@ -1862,8 +1873,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         IStormClusterState state = stormClusterState;
         Map<List<Integer>, Map<String, Object>> executorBeats =
             StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port()));
-        heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors,
-            ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+        heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors, getTopologyHeartbeatTimeoutSecs(topoId));
     }
 
     /**
@@ -1880,17 +1890,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 updateHeartbeatsFromZkHeartbeat(topoId, topologyToExecutors.get(topoId), entry.getValue());
             } else {
                 LOG.debug("Timing out old heartbeats for {}", topoId);
-                heartbeatsCache.timeoutOldHeartbeats(topoId, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+                heartbeatsCache.timeoutOldHeartbeats(topoId, getTopologyHeartbeatTimeoutSecs(topoId));
             }
         }
     }
 
-    private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) {
-        heartbeatsCache.updateHeartbeat(workerHeartbeat, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+    private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat, int heartbeatTimeoutSecs) {
+        heartbeatsCache.updateHeartbeat(workerHeartbeat, heartbeatTimeoutSecs);
     }
 
     private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats workerHeartbeats) {
-        workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker);
+        for (SupervisorWorkerHeartbeat hb : workerHeartbeats.get_worker_heartbeats()) {
+            String topoId = hb.get_storm_id();
+            int heartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoId);
+            updateCachedHeartbeatsFromWorker(hb, heartbeatTimeoutSecs);
+        }
         if (!heartbeatsReadyFlag.get() && !Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) {
             heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id());
         }
@@ -1927,8 +1941,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
 
     private Set<List<Integer>> aliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment) {
-        return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment,
-            ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)));
+        return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment, getTopologyLaunchHeartbeatTimeoutSec(topoId));
     }
 
     private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
@@ -2508,6 +2521,35 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         base.set_principal((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL));
     }
 
+    // Topology may set custom heartbeat timeout.
+    private int getTopologyHeartbeatTimeoutSecs(Map<String, Object> topoConf) {
+        int defaultNimbusTimeout = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS));
+        if (topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
+            int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
+            topoTimeout = Math.max(topoTimeout, defaultNimbusTimeout);
+            return topoTimeout;
+        }
+
+        return defaultNimbusTimeout;
+    }
+
+    private int getTopologyHeartbeatTimeoutSecs(String topoId) {
+        try {
+            Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
+            return getTopologyHeartbeatTimeoutSecs(topoConf);
+        } catch (Exception e) {
+            // contain any exception
+            LOG.warn("Exception when getting heartbeat timeout.", e.getMessage());
+            return ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS));
+        }
+    }
+
+    private int getTopologyLaunchHeartbeatTimeoutSec(String topoId) {
+        int nimbusLaunchTimeout = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS));
+        int topoHeartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoId);
+        return Math.max(nimbusLaunchTimeout, topoHeartbeatTimeoutSecs);
+    }
+
     private void startTopology(String topoName, String topoId, TopologyStatus initStatus, String owner,
                                String principal, Map<String, Object> topoConf, StormTopology stormTopology)
         throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
@@ -4705,11 +4747,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         String id = hb.get_storm_id();
         try {
             Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
-            topoConf = Utils.merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "sendSupervisorWorkerHeartbeat");
             if (isLeader()) {
-                updateCachedHeartbeatsFromWorker(hb);
+                int heartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoConf);
+                updateCachedHeartbeatsFromWorker(hb, heartbeatTimeoutSecs);
             }
         } catch (Exception e) {
             LOG.warn("Send HB exception. (topology id='{}')", id, e);
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index a25faad..21ff8ec 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -49,6 +49,7 @@ import org.apache.storm.metricstore.MetricException;
 import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Utils;
@@ -418,6 +419,13 @@ public abstract class Container implements Killable {
         }
         data.put(DaemonConfig.LOGS_USERS, logsUsers.toArray());
 
+        if (topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS) != null) {
+            int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
+            int defaultWorkerTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+            topoTimeout = Math.max(topoTimeout, defaultWorkerTimeout);
+            data.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, topoTimeout);
+        }
+
         File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port);
 
         Yaml yaml = new Yaml();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index c8e6f19..7575a91 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -667,7 +667,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
         if (hb != null) {
             long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
-            if (hbAgeMs <= staticState.hbTimeoutMs) {
+            long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
+            if (hbAgeMs <= hbTimeoutMs) {
                 return dynamicState.withState(MachineState.RUNNING);
             }
         }
@@ -681,9 +682,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         dynamicState = updateAssignmentIfNeeded(dynamicState);
 
         long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
-        if (timeDiffms > staticState.firstHbTimeoutMs) {
+        long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState);
+        if (timeDiffms > hbFirstTimeoutMs) {
             LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
-                     staticState.firstHbTimeoutMs);
+                    hbFirstTimeoutMs);
             return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
         }
 
@@ -745,8 +747,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         }
 
         long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
-        if (timeDiffMs > staticState.hbTimeoutMs) {
-            LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs);
+        long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
+        if (timeDiffMs > hbTimeoutMs) {
+            LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, hbTimeoutMs);
             return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
         }
 
@@ -833,6 +836,30 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
         return dynamicState.state;
     }
 
+    /*
+     * Get worker heartbeat timeout time in ms. Use topology specified timeout if provided.
+     */
+    private static long getHbTimeoutMs(StaticState staticState, DynamicState dynamicState) {
+        long hbTimeoutMs = staticState.hbTimeoutMs;
+        Map<String, Object> topoConf = dynamicState.container.topoConf;
+
+        if (topoConf != null && topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
+            long topoHbTimeoutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) * 1000;
+            topoHbTimeoutMs = Math.max(topoHbTimeoutMs, hbTimeoutMs);
+            hbTimeoutMs = topoHbTimeoutMs;
+        }
+
+        return hbTimeoutMs;
+    }
+
+    /*
+     * Get worker heartbeat timeout when waiting for worker to start.
+     * If topology specific timeout if set, ensure first heartbeat timeout >= topology specific timeout.
+     */
+    private static long getFirstHbTimeoutMs(StaticState staticState, DynamicState dynamicState) {
+        return Math.max(getHbTimeoutMs(staticState, dynamicState), staticState.firstHbTimeoutMs);
+    }
+
     /**
      * Set a new assignment asynchronously.
      * @param newAssignment the new assignment for this slot to run, null to run nothing
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index a0d0397..5e4ce3f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -117,10 +117,6 @@ public class SupervisorUtils {
         return _instance.readWorkerHeartbeatImpl(conf, workerId);
     }
 
-    public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
-        return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
-    }
-
     public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) {
         Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
 
@@ -143,8 +139,4 @@ public class SupervisorUtils {
             return null;
         }
     }
-
-    private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
-        return (now - whb.get_time_secs()) > ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
-    }
 }
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
index 0d5b14a..e57b626 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
@@ -25,6 +25,7 @@ import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER;
 import com.codahale.metrics.Meter;
 import com.google.common.collect.Lists;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -40,11 +41,15 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.storm.Config;
 import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
 import org.apache.storm.daemon.supervisor.SupervisorUtils;
 import org.apache.storm.daemon.utils.PathUtil;
+import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.utils.LruMap;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.jooq.lambda.Unchecked;
@@ -64,6 +69,7 @@ public class WorkerLogs {
     private final Map<String, Object> stormConf;
     private final Path logRootDir;
     private final DirectoryCleaner directoryCleaner;
+    private final LruMap<String, Integer> mapTopologyIdToHeartbeatTimeout;
 
     /**
      * Constructor.
@@ -77,6 +83,7 @@ public class WorkerLogs {
         this.logRootDir = logRootDir.toAbsolutePath().normalize();
         this.numSetPermissionsExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_SET_PERMISSION_EXCEPTIONS);
         this.directoryCleaner = new DirectoryCleaner(metricsRegistry);
+        this.mapTopologyIdToHeartbeatTimeout = new LruMap<>(200);
     }
 
     /**
@@ -189,12 +196,40 @@ public class WorkerLogs {
      */
     public Set<String> getAliveIds(int nowSecs) throws IOException {
         return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream()
-                .filter(entry -> Objects.nonNull(entry.getValue())
-                        && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf))
+                .filter(entry -> Objects.nonNull(entry.getValue()) && !isTimedOut(nowSecs, entry))
                 .map(Map.Entry::getKey)
                 .collect(toCollection(TreeSet::new));
     }
 
+    private boolean isTimedOut(int nowSecs, Map.Entry<String, LSWorkerHeartbeat> entry) {
+        LSWorkerHeartbeat hb = entry.getValue();
+        int workerLogTimeout = getTopologyTimeout(hb);
+        return (nowSecs - hb.get_time_secs()) >= workerLogTimeout;
+    }
+
+    private int getTopologyTimeout(LSWorkerHeartbeat hb) {
+        String topoId = hb.get_topology_id();
+        Integer cachedTimeout = mapTopologyIdToHeartbeatTimeout.get(topoId);
+        if (cachedTimeout != null) {
+            return cachedTimeout;
+        } else {
+            int timeout = getWorkerLogTimeout(stormConf, topoId, hb.get_port());
+            mapTopologyIdToHeartbeatTimeout.put(topoId, timeout);
+            return timeout;
+        }
+    }
+
+    private int getWorkerLogTimeout(Map<String, Object> conf, String topologyId, int port) {
+        int defaultWorkerLogTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port);
+        Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(file.getAbsolutePath());
+        if (map == null) {
+            return defaultWorkerLogTimeout;
+        }
+
+        return (Integer) map.getOrDefault(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, defaultWorkerLogTimeout);
+    }
+
     /**
      * Finds directories for specific worker ids that can be cleaned up.
      *