You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2019/09/25 04:32:35 UTC
[storm] branch master updated: STORM-3479 HB timeout configurable
on a topology level
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new e28401f STORM-3479 HB timeout configurable on a topology level
new 822a468 Merge pull request #3121 from dandsager1/STORM-3479
e28401f is described below
commit e28401fe1838a7e4d2edcd2b636b0ee6710b9435
Author: dandsager <da...@verizonmedia.com>
AuthorDate: Thu Sep 5 15:10:17 2019 -0500
STORM-3479 HB timeout configurable on a topology level
---
conf/defaults.yaml | 2 +
storm-client/src/jvm/org/apache/storm/Config.java | 18 +++++++
.../main/java/org/apache/storm/DaemonConfig.java | 4 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 62 ++++++++++++++++++----
.../apache/storm/daemon/supervisor/Container.java | 8 +++
.../org/apache/storm/daemon/supervisor/Slot.java | 37 +++++++++++--
.../storm/daemon/supervisor/SupervisorUtils.java | 8 ---
.../storm/daemon/logviewer/utils/WorkerLogs.java | 39 +++++++++++++-
8 files changed, 152 insertions(+), 26 deletions(-)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e555ac8..3e9f8fc 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -171,6 +171,8 @@ supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
#max timeout for a node worker heartbeats when master gains leadership
supervisor.worker.heartbeats.max.timeout.secs: 600
+#For topology configurable heartbeat timeout, maximum allowed heartbeat timeout.
+worker.max.timeout.secs: 600
supervisor.enable: true
supervisor.supervisors: []
supervisor.supervisors.commands: []
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index b46c112..a54faf6 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1042,12 +1042,30 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport";
/**
* How long a worker can go without heartbeating before the supervisor tries to restart the worker process.
+ * Can be overridden by {@link #TOPOLOGY_WORKER_TIMEOUT_SECS}, if set.
*/
@IsInteger
@IsPositiveNumber
@NotNull
public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
/**
+ * Enforce maximum on {@link #TOPOLOGY_WORKER_TIMEOUT_SECS}.
+ */
+ @IsInteger
+ @IsPositiveNumber
+ @NotNull
+ public static final String WORKER_MAX_TIMEOUT_SECS = "worker.max.timeout.secs";
+ /**
+ * Topology configurable worker heartbeat timeout before the supervisor tries to restart the worker process.
+ * Maximum value constrained by {@link #WORKER_MAX_TIMEOUT_SECS}.
+ * When topology timeout is greater, the following configs are effectively overridden:
+ * {@link #SUPERVISOR_WORKER_TIMEOUT_SECS}, SUPERVISOR_WORKER_START_TIMEOUT_SECS, NIMBUS_TASK_TIMEOUT_SECS and NIMBUS_TASK_LAUNCH_SECS.
+ */
+ @IsInteger
+ @IsPositiveNumber
+ @NotNull
+ public static final String TOPOLOGY_WORKER_TIMEOUT_SECS = "topology.worker.timeout.secs";
+ /**
* How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill.
* If a worker fails to shut down gracefully within this delay, it will either suicide or be forcibly killed by the supervisor.
*/
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index def49ae..28076f4 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -186,12 +186,12 @@ public class DaemonConfig implements Validated {
/**
* How long without heartbeating a task can go before nimbus will consider the task dead and reassign it to another location.
+ * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
*/
@IsInteger
@IsPositiveNumber
public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
-
/**
* How often nimbus should wake up to check heartbeats and do reassignments. Note that if a machine ever goes down Nimbus will
* immediately wake up and take action. This parameter is for checking for failures when there's no explicit event like that occurring.
@@ -234,6 +234,7 @@ public class DaemonConfig implements Validated {
*
* <p>A separate timeout exists for launch because there can be quite a bit of overhead
* to launching new JVM's and configuring them.</p>
+ * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
*/
@IsInteger
@IsPositiveNumber
@@ -794,6 +795,7 @@ public class DaemonConfig implements Validated {
* How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process.
* This value override supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the
* JVM on launch.
+ * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set.
*/
@IsInteger
@IsPositiveNumber
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 45add73..69b2bb1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -1113,6 +1113,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+
+ if (mergedConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
+ int workerTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
+ int workerMaxTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.WORKER_MAX_TIMEOUT_SECS));
+ if (workerTimeoutSecs > workerMaxTimeoutSecs) {
+ ret.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, workerMaxTimeoutSecs);
+ String topoId = (String) mergedConf.get(Config.STORM_ID);
+ LOG.warn("Topology {} topology.worker.timeout.secs is too large. Reducing from {} to {}",
+ topoId, workerTimeoutSecs, workerMaxTimeoutSecs);
+ }
+ }
return ret;
}
@@ -1862,8 +1873,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = stormClusterState;
Map<List<Integer>, Map<String, Object>> executorBeats =
StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port()));
- heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors,
- ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+ heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors, getTopologyHeartbeatTimeoutSecs(topoId));
}
/**
@@ -1880,17 +1890,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
updateHeartbeatsFromZkHeartbeat(topoId, topologyToExecutors.get(topoId), entry.getValue());
} else {
LOG.debug("Timing out old heartbeats for {}", topoId);
- heartbeatsCache.timeoutOldHeartbeats(topoId, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+ heartbeatsCache.timeoutOldHeartbeats(topoId, getTopologyHeartbeatTimeoutSecs(topoId));
}
}
}
- private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) {
- heartbeatsCache.updateHeartbeat(workerHeartbeat, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+ private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat, int heartbeatTimeoutSecs) {
+ heartbeatsCache.updateHeartbeat(workerHeartbeat, heartbeatTimeoutSecs);
}
private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats workerHeartbeats) {
- workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker);
+ for (SupervisorWorkerHeartbeat hb : workerHeartbeats.get_worker_heartbeats()) {
+ String topoId = hb.get_storm_id();
+ int heartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoId);
+ updateCachedHeartbeatsFromWorker(hb, heartbeatTimeoutSecs);
+ }
if (!heartbeatsReadyFlag.get() && !Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) {
heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id());
}
@@ -1927,8 +1941,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private Set<List<Integer>> aliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment) {
- return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment,
- ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)));
+ return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment, getTopologyLaunchHeartbeatTimeoutSec(topoId));
}
private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
@@ -2508,6 +2521,35 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
base.set_principal((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL));
}
+ // Topology may set custom heartbeat timeout.
+ private int getTopologyHeartbeatTimeoutSecs(Map<String, Object> topoConf) {
+ int defaultNimbusTimeout = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS));
+ if (topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
+ int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
+ topoTimeout = Math.max(topoTimeout, defaultNimbusTimeout);
+ return topoTimeout;
+ }
+
+ return defaultNimbusTimeout;
+ }
+
+ private int getTopologyHeartbeatTimeoutSecs(String topoId) {
+ try {
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
+ return getTopologyHeartbeatTimeoutSecs(topoConf);
+ } catch (Exception e) {
+ // contain any exception
+ LOG.warn("Exception when getting heartbeat timeout.", e.getMessage());
+ return ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS));
+ }
+ }
+
+ private int getTopologyLaunchHeartbeatTimeoutSec(String topoId) {
+ int nimbusLaunchTimeout = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS));
+ int topoHeartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoId);
+ return Math.max(nimbusLaunchTimeout, topoHeartbeatTimeoutSecs);
+ }
+
private void startTopology(String topoName, String topoId, TopologyStatus initStatus, String owner,
String principal, Map<String, Object> topoConf, StormTopology stormTopology)
throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
@@ -4705,11 +4747,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
String id = hb.get_storm_id();
try {
Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
- topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "sendSupervisorWorkerHeartbeat");
if (isLeader()) {
- updateCachedHeartbeatsFromWorker(hb);
+ int heartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoConf);
+ updateCachedHeartbeatsFromWorker(hb, heartbeatTimeoutSecs);
}
} catch (Exception e) {
LOG.warn("Send HB exception. (topology id='{}')", id, e);
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index a25faad..21ff8ec 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -49,6 +49,7 @@ import org.apache.storm.metricstore.MetricException;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
@@ -418,6 +419,13 @@ public abstract class Container implements Killable {
}
data.put(DaemonConfig.LOGS_USERS, logsUsers.toArray());
+ if (topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS) != null) {
+ int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
+ int defaultWorkerTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+ topoTimeout = Math.max(topoTimeout, defaultWorkerTimeout);
+ data.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, topoTimeout);
+ }
+
File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port);
Yaml yaml = new Yaml();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index c8e6f19..7575a91 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -667,7 +667,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb != null) {
long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
- if (hbAgeMs <= staticState.hbTimeoutMs) {
+ long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
+ if (hbAgeMs <= hbTimeoutMs) {
return dynamicState.withState(MachineState.RUNNING);
}
}
@@ -681,9 +682,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
dynamicState = updateAssignmentIfNeeded(dynamicState);
long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
- if (timeDiffms > staticState.firstHbTimeoutMs) {
+ long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState);
+ if (timeDiffms > hbFirstTimeoutMs) {
LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
- staticState.firstHbTimeoutMs);
+ hbFirstTimeoutMs);
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
}
@@ -745,8 +747,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
}
long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
- if (timeDiffMs > staticState.hbTimeoutMs) {
- LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs);
+ long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
+ if (timeDiffMs > hbTimeoutMs) {
+ LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, hbTimeoutMs);
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
}
@@ -833,6 +836,30 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback
return dynamicState.state;
}
+ /*
+ * Get worker heartbeat timeout time in ms. Use topology specified timeout if provided.
+ */
+ private static long getHbTimeoutMs(StaticState staticState, DynamicState dynamicState) {
+ long hbTimeoutMs = staticState.hbTimeoutMs;
+ Map<String, Object> topoConf = dynamicState.container.topoConf;
+
+ if (topoConf != null && topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
+ long topoHbTimeoutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) * 1000;
+ topoHbTimeoutMs = Math.max(topoHbTimeoutMs, hbTimeoutMs);
+ hbTimeoutMs = topoHbTimeoutMs;
+ }
+
+ return hbTimeoutMs;
+ }
+
+ /*
+ * Get worker heartbeat timeout when waiting for worker to start.
+ * If topology specific timeout if set, ensure first heartbeat timeout >= topology specific timeout.
+ */
+ private static long getFirstHbTimeoutMs(StaticState staticState, DynamicState dynamicState) {
+ return Math.max(getHbTimeoutMs(staticState, dynamicState), staticState.firstHbTimeoutMs);
+ }
+
/**
* Set a new assignment asynchronously.
* @param newAssignment the new assignment for this slot to run, null to run nothing
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index a0d0397..5e4ce3f 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -117,10 +117,6 @@ public class SupervisorUtils {
return _instance.readWorkerHeartbeatImpl(conf, workerId);
}
- public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
- return _instance.isWorkerHbTimedOutImpl(now, whb, conf);
- }
-
public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) {
Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>();
@@ -143,8 +139,4 @@ public class SupervisorUtils {
return null;
}
}
-
- private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) {
- return (now - whb.get_time_secs()) > ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
- }
}
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
index 0d5b14a..e57b626 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
@@ -25,6 +25,7 @@ import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER;
import com.codahale.metrics.Meter;
import com.google.common.collect.Lists;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -40,11 +41,15 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.storm.Config;
import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.daemon.utils.PathUtil;
+import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.utils.LruMap;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.jooq.lambda.Unchecked;
@@ -64,6 +69,7 @@ public class WorkerLogs {
private final Map<String, Object> stormConf;
private final Path logRootDir;
private final DirectoryCleaner directoryCleaner;
+ private final LruMap<String, Integer> mapTopologyIdToHeartbeatTimeout;
/**
* Constructor.
@@ -77,6 +83,7 @@ public class WorkerLogs {
this.logRootDir = logRootDir.toAbsolutePath().normalize();
this.numSetPermissionsExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_SET_PERMISSION_EXCEPTIONS);
this.directoryCleaner = new DirectoryCleaner(metricsRegistry);
+ this.mapTopologyIdToHeartbeatTimeout = new LruMap<>(200);
}
/**
@@ -189,12 +196,40 @@ public class WorkerLogs {
*/
public Set<String> getAliveIds(int nowSecs) throws IOException {
return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream()
- .filter(entry -> Objects.nonNull(entry.getValue())
- && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf))
+ .filter(entry -> Objects.nonNull(entry.getValue()) && !isTimedOut(nowSecs, entry))
.map(Map.Entry::getKey)
.collect(toCollection(TreeSet::new));
}
+ private boolean isTimedOut(int nowSecs, Map.Entry<String, LSWorkerHeartbeat> entry) {
+ LSWorkerHeartbeat hb = entry.getValue();
+ int workerLogTimeout = getTopologyTimeout(hb);
+ return (nowSecs - hb.get_time_secs()) >= workerLogTimeout;
+ }
+
+ private int getTopologyTimeout(LSWorkerHeartbeat hb) {
+ String topoId = hb.get_topology_id();
+ Integer cachedTimeout = mapTopologyIdToHeartbeatTimeout.get(topoId);
+ if (cachedTimeout != null) {
+ return cachedTimeout;
+ } else {
+ int timeout = getWorkerLogTimeout(stormConf, topoId, hb.get_port());
+ mapTopologyIdToHeartbeatTimeout.put(topoId, timeout);
+ return timeout;
+ }
+ }
+
+ private int getWorkerLogTimeout(Map<String, Object> conf, String topologyId, int port) {
+ int defaultWorkerLogTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+ File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port);
+ Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(file.getAbsolutePath());
+ if (map == null) {
+ return defaultWorkerLogTimeout;
+ }
+
+ return (Integer) map.getOrDefault(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, defaultWorkerLogTimeout);
+ }
+
/**
* Finds directories for specific worker ids that can be cleaned up.
*