You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/03/23 18:50:39 UTC
apex-core git commit: APEXCORE-662 Raising StramEvent in case of
heartbeat miss
Repository: apex-core
Updated Branches:
refs/heads/master df8bc7e00 -> 16d1bf62d
APEXCORE-662 Raising StramEvent in case of heartbeat miss
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/16d1bf62
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/16d1bf62
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/16d1bf62
Branch: refs/heads/master
Commit: 16d1bf62d7a4c83aec1c3bdb9a8e5878fae42323
Parents: df8bc7e
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Tue Mar 14 16:31:43 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Thu Mar 23 22:06:53 2017 +0530
----------------------------------------------------------------------
.../datatorrent/stram/StreamingContainerManager.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/16d1bf62/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 9b0c4f4..ee07af1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -780,14 +780,22 @@ public class StreamingContainerManager implements PlanContext
//LOG.debug("{} {} {}", c.getExternalId(), currentTms - sca.createdMillis, this.vars.heartbeatTimeoutMillis);
// container allocated but process was either not launched or is not able to phone home
if (currentTms - sca.createdMillis > 2 * this.vars.heartbeatTimeoutMillis) {
- LOG.info("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis);
+ LOG.error("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis);
containerStopRequests.put(c.getExternalId(), c.getExternalId());
}
} else {
if (currentTms - sca.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis) {
if (!isApplicationIdle()) {
+ // Check if the heartbeat for this agent has already been missed to raise the StramEvent only once
+ if (sca.lastHeartbeatMillis != -1) {
+ String info = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis);
+ LOG.error(info);
+ StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), info, null);
+ stramEvent.setReason(info);
+ recordEventAsync(stramEvent);
+ sca.lastHeartbeatMillis = -1;
+ }
// request stop (kill) as process may still be hanging around (would have been detected by Yarn otherwise)
- LOG.info("Container {}@{} heartbeat timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis);
containerStopRequests.put(c.getExternalId(), c.getExternalId());
}
}