You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/03/23 18:50:39 UTC

apex-core git commit: APEXCORE-662 Raising StramEvent in case of heartbeat miss

Repository: apex-core
Updated Branches:
  refs/heads/master df8bc7e00 -> 16d1bf62d


APEXCORE-662 Raising StramEvent in case of heartbeat miss


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/16d1bf62
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/16d1bf62
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/16d1bf62

Branch: refs/heads/master
Commit: 16d1bf62d7a4c83aec1c3bdb9a8e5878fae42323
Parents: df8bc7e
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Tue Mar 14 16:31:43 2017 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Thu Mar 23 22:06:53 2017 +0530

----------------------------------------------------------------------
 .../datatorrent/stram/StreamingContainerManager.java    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/16d1bf62/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 9b0c4f4..ee07af1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -780,14 +780,22 @@ public class StreamingContainerManager implements PlanContext
           //LOG.debug("{} {} {}", c.getExternalId(), currentTms - sca.createdMillis, this.vars.heartbeatTimeoutMillis);
           // container allocated but process was either not launched or is not able to phone home
           if (currentTms - sca.createdMillis > 2 * this.vars.heartbeatTimeoutMillis) {
-            LOG.info("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis);
+            LOG.error("Container {}@{} startup timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.createdMillis);
             containerStopRequests.put(c.getExternalId(), c.getExternalId());
           }
         } else {
           if (currentTms - sca.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis) {
             if (!isApplicationIdle()) {
+              // Check if the heartbeat for this agent has already been missed to raise the StramEvent only once
+              if (sca.lastHeartbeatMillis != -1) {
+                String info = String.format("Container %s@%s heartbeat timeout (%d%n ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis);
+                LOG.error(info);
+                StramEvent stramEvent = new StramEvent.ContainerErrorEvent(c.getExternalId(), info, null);
+                stramEvent.setReason(info);
+                recordEventAsync(stramEvent);
+                sca.lastHeartbeatMillis = -1;
+              }
               // request stop (kill) as process may still be hanging around (would have been detected by Yarn otherwise)
-              LOG.info("Container {}@{} heartbeat timeout ({} ms).", c.getExternalId(), c.host, currentTms - sca.lastHeartbeatMillis);
               containerStopRequests.put(c.getExternalId(), c.getExternalId());
             }
           }