You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/05/10 23:58:18 UTC

samza git commit: SAMZA-871; Heart-beat mechanism between JobCoordinator and all running containers

Repository: samza
Updated Branches:
  refs/heads/master 46b1333c4 -> f6d0d6551


SAMZA-871; Heart-beat mechanism between JobCoordinator and all running containers

Author: Abhishek Shivanna <as...@linkedin.com>

Reviewers: Navina Ramesh <na...@apache.org>, Jagadish V <ja...@apache.org>

Closes #163 from abhishekshivanna/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f6d0d655
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f6d0d655
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f6d0d655

Branch: refs/heads/master
Commit: f6d0d65516aed4f37216dc44ab312cdf1e7b124a
Parents: 46b1333
Author: Abhishek Shivanna <as...@linkedin.com>
Authored: Wed May 10 16:58:14 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed May 10 16:58:14 2017 -0700

----------------------------------------------------------------------
 .../versioned/container/metrics-table.html      |   4 +
 .../container/ContainerHeartbeatClient.java     | 112 +++++++++++++++++++
 .../container/ContainerHeartbeatMonitor.java    |  75 +++++++++++++
 .../container/ContainerHeartbeatResponse.java   |  42 +++++++
 .../samza/runtime/LocalContainerRunner.java     |  43 +++++--
 .../samza/config/ShellCommandConfig.scala       |   5 +
 .../samza/coordinator/JobModelManager.scala     |   2 +-
 .../main/scala/org/apache/samza/util/Util.scala |   2 +-
 .../container/TestContainerHeartbeatClient.java |  81 ++++++++++++++
 .../TestContainerHeartbeatMonitor.java          |  63 +++++++++++
 .../samza/job/yarn/YarnContainerRunner.java     |   2 +
 .../webapp/YarnContainerHeartbeatServlet.java   |  92 +++++++++++++++
 .../job/yarn/SamzaYarnAppMasterService.scala    |   3 +-
 .../TestYarnContainerHeartbeatServlet.java      |  97 ++++++++++++++++
 .../yarn/TestSamzaYarnAppMasterService.scala    |  21 ++--
 15 files changed, 624 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/docs/learn/documentation/versioned/container/metrics-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 8d425a2..2eb46e3 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -483,6 +483,10 @@
         <td>job-healthy</td>
         <td>State indicating whether the job is healthy or not</td>
     </tr>
+    <tr>
+        <td>heartbeats-expired</td>
+        <td>Number of heartbeat requests from containers that are invalid</td>
+    </tr>
 
     <tr>
         <th colspan="2" class="section" id="kafka-system-consumer-metrics">org.apache.samza.system.kafka.KafkaSystemConsumerMetrics</th>

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
new file mode 100644
index 0000000..cc14948
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.stream.Collectors;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Issues a heartbeat to the coordinator and returns a
+ * {@link ContainerHeartbeatResponse}.
+ * Here's the description of the protocol between the
+ * container and the coordinator:
+ *
+ * The heartbeat request contains a <code> executionContainerId
+ * </code> that identifies the container from which the
+ * request is made. The coordinator validates the provided
+ * executionContainerId against its list of containers that should be
+ * running and returns a {@link ContainerHeartbeatResponse}.
+ *
+ * The returned {@link ContainerHeartbeatResponse#isAlive()} is
+ * <code> true </code> iff. the coordinator has determined
+ * that the container is valid and should continue running.
+ */
+public class ContainerHeartbeatClient {
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatClient.class);
+  private static final int NUM_RETRIES = 3;
+  private static final int TIMEOUT_MS = 5000;
+  private static final int BACKOFF_MULTIPLIER = 2;
+  private final String heartbeatEndpoint;
+
+  public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
+    this.heartbeatEndpoint =
+        String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+  }
+
+  /**
+   * Issues a heartbeat request to the coordinator and
+   * returns the corresponding {@link ContainerHeartbeatResponse}.
+   */
+  public ContainerHeartbeatResponse requestHeartbeat() {
+    ObjectMapper mapper = new ObjectMapper();
+    ContainerHeartbeatResponse response;
+    String reply = "";
+    try {
+      reply = httpGet(new URL(heartbeatEndpoint));
+      LOG.debug("Container Heartbeat got response {}", reply);
+      response = mapper.readValue(reply, ContainerHeartbeatResponse.class);
+      return response;
+    } catch (IOException e) {
+      LOG.error("Error in container heart beat protocol. Query url: {} response: {}", heartbeatEndpoint, reply);
+    }
+    response = new ContainerHeartbeatResponse(false);
+    return response;
+  }
+
+  String httpGet(URL url) throws IOException {
+    HttpURLConnection conn;
+    int delayMillis = 1000;
+
+    for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) {
+      conn = Util.getHttpConnection(url, TIMEOUT_MS);
+      try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+        if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+          throw new IOException(String.format("HTTP error fetching url %s. Returned status code %d", url.toString(),
+              conn.getResponseCode()));
+        } else {
+          return br.lines().collect(Collectors.joining());
+        }
+      } catch (Exception e) {
+        LOG.error("Error in heartbeat request", e);
+        sleepUninterruptibly(delayMillis);
+        delayMillis = delayMillis * BACKOFF_MULTIPLIER;
+      }
+    }
+    throw new IOException(String.format("Error fetching url: %s. Tried %d time(s).", url.toString(), NUM_RETRIES));
+  }
+
+  private void sleepUninterruptibly(int delayMillis) {
+    try {
+      Thread.sleep(delayMillis);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
new file mode 100644
index 0000000..940e80f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContainerHeartbeatMonitor {
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
+  private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+  private static final int SCHEDULE_MS = 60000;
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+  private final Runnable onContainerExpired;
+  private final ContainerHeartbeatClient containerHeartbeatClient;
+  private boolean started = false;
+
+  public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
+    this.onContainerExpired = onContainerExpired;
+    this.containerHeartbeatClient = containerHeartbeatClient;
+  }
+
+  public void start() {
+    if (started) {
+      LOG.warn("Skipping attempt to start an already started ContainerHeartbeatMonitor.");
+      return;
+    }
+    LOG.info("Starting ContainerHeartbeatMonitor");
+    scheduler.scheduleAtFixedRate(() -> {
+        ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
+        if (!response.isAlive()) {
+          onContainerExpired.run();
+        }
+      }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
+    started = true;
+  }
+
+  public void stop() {
+    if (started) {
+      LOG.info("Stopping ContainerHeartbeatMonitor");
+      scheduler.shutdown();
+    }
+  }
+
+  private static class HeartbeatThreadFactory implements ThreadFactory {
+    private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
+    private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+      return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java
new file mode 100644
index 0000000..d402ef1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Used to represent the heartbeat response between
+ * the JobCoordinator and the containers.
+ * {@link ContainerHeartbeatResponse#isAlive()} is set to <code>true</code>
+ * iff. the heartbeat is valid.
+ */
+public class ContainerHeartbeatResponse {
+
+  private final boolean alive;
+
+  public ContainerHeartbeatResponse(@JsonProperty("alive") boolean alive) {
+    this.alive = alive;
+  }
+
+  public boolean isAlive() {
+    return alive;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index e02ee23..d690c80 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -25,6 +25,8 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
 import org.apache.samza.container.SamzaContainerExceptionHandler;
@@ -55,7 +57,9 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
   private final JobModel jobModel;
   private final String containerId;
-  private volatile Throwable containerException = null;
+  private volatile Throwable containerRunnerException = null;
+  private ContainerHeartbeatMonitor containerHeartbeatMonitor;
+  private SamzaContainer container;
 
   public LocalContainerRunner(JobModel jobModel, String containerId) {
     super(jobModel.getConfig());
@@ -68,7 +72,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
     ContainerModel containerModel = jobModel.getContainers().get(containerId);
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
-    SamzaContainer container = SamzaContainer$.MODULE$.apply(
+    container = SamzaContainer$.MODULE$.apply(
         containerModel,
         config,
         jobModel.maxChangeLogStreamPartitions,
@@ -89,14 +93,14 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
           @Override
           public void onContainerFailed(Throwable t) {
             log.info("Container Failed");
-            containerException = t;
+            containerRunnerException = t;
           }
         });
-
+    startContainerHeartbeatMonitor();
     container.run();
-
-    if (containerException != null) {
-      log.error("Container stopped with Exception. Exiting process now.", containerException);
+    stopContainerHeartbeatMonitor();
+    if (containerRunnerException != null) {
+      log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
       System.exit(1);
     }
   }
@@ -137,6 +141,29 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
     MDC.put("jobId", jobId);
 
     StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
-    new LocalContainerRunner(jobModel, containerId).run(streamApp);
+    LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
+    localContainerRunner.run(streamApp);
+  }
+
+  private void startContainerHeartbeatMonitor() {
+    String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+    String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
+    if (executionEnvContainerId != null) {
+      log.info("Got execution environment container id: {}", executionEnvContainerId);
+      containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> {
+          container.shutdown();
+          containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
+        }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+      containerHeartbeatMonitor.start();
+    } else {
+      containerHeartbeatMonitor = null;
+      log.warn("executionEnvContainerId not set. Container heartbeat monitor will not be started");
+    }
+  }
+
+  private void stopContainerHeartbeatMonitor() {
+    if (containerHeartbeatMonitor != null) {
+      containerHeartbeatMonitor.stop();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 3c0f320..caad7fd 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -45,6 +45,11 @@ object ShellCommandConfig {
    */
   val ENV_JAVA_HOME = "JAVA_HOME"
 
+  /**
+    * The ID assigned to the container by the execution environment (eg: YARN Container Id)
+    */
+  val ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID"
+
   /*
    * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
    * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index e39ea3b..dda0b6b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -156,7 +156,7 @@ object JobModelManager extends Logging {
     jobModelRef.set(jobModel)
 
     val server = new HttpServer
-    server.addServlet("/*", new JobServlet(jobModelRef))
+    server.addServlet("/", new JobServlet(jobModelRef))
     currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor)
     currentJobModelManager
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e7832a0..6c224e6 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -172,7 +172,7 @@ object Util extends Logging {
     readStream(httpConn.getInputStream)
   }
 
-  private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+  def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
     val conn = url.openConnection()
     conn.setConnectTimeout(timeout)
     conn.setReadTimeout(timeout)

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java
new file mode 100644
index 0000000..6dc07f8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.io.IOException;
+import java.net.URL;
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class TestContainerHeartbeatClient {
+  private MockContainerHeartbeatClient client =
+      new MockContainerHeartbeatClient("http://fake-endpoint/", "FAKE_CONTAINER_ID");
+
+  @Test
+  public void testClientResponseForHeartbeatAlive()
+      throws IOException {
+    client.setHttpOutput("{\"alive\": true}");
+    ContainerHeartbeatResponse response = client.requestHeartbeat();
+    Assert.assertTrue(response.isAlive());
+  }
+
+  @Test
+  public void testClientResponseForHeartbeatDead()
+      throws IOException {
+    client.setHttpOutput("{\"alive\": false}");
+    ContainerHeartbeatResponse response = client.requestHeartbeat();
+    Assert.assertFalse(response.isAlive());
+  }
+
+  @Test
+  public void testClientResponseOnBadRequest()
+      throws IOException {
+    client.shouldThrowException(true);
+    ContainerHeartbeatResponse response = client.requestHeartbeat();
+    Assert.assertFalse(response.isAlive());
+  }
+
+  private class MockContainerHeartbeatClient extends ContainerHeartbeatClient {
+    private String httpOutput;
+    private boolean throwException = false;
+
+    public void shouldThrowException(boolean throwException) {
+      this.throwException = throwException;
+    }
+
+    public void setHttpOutput(String httpOutput) {
+      this.httpOutput = httpOutput;
+    }
+
+    MockContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
+      super(coordinatorUrl, executionEnvContainerId);
+    }
+
+    @Override
+    String httpGet(URL url)
+        throws IOException {
+      if (!throwException) {
+        return httpOutput;
+      } else {
+        throw new IOException("Exception thrown");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
new file mode 100644
index 0000000..829a158
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestContainerHeartbeatMonitor {
+
+  @Test
+  public void testCallbackWhenHeartbeatDead()
+      throws InterruptedException {
+    ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    Runnable onExpired = () -> {
+      countDownLatch.countDown();
+    };
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+    ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
+    when(mockClient.requestHeartbeat()).thenReturn(response);
+    monitor.start();
+    boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
+    Assert.assertTrue(success);
+  }
+
+  @Test
+  public void testDoesNotCallbackWhenHeartbeatAlive()
+      throws InterruptedException {
+    ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    Runnable onExpired = () -> {
+      countDownLatch.countDown();
+    };
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+    ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
+    when(client.requestHeartbeat()).thenReturn(response);
+    monitor.start();
+    boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
+    Assert.assertFalse(success);
+    Assert.assertEquals(1, countDownLatch.getCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index 84ded62..cdcf2d1 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.samza.clustermanager.SamzaContainerLaunchException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.YarnConfig;
 import org.apache.samza.job.CommandBuilder;
 import org.apache.samza.util.Util;
@@ -110,6 +111,7 @@ public class YarnContainerRunner {
     log.info("Container ID {} using command {}", samzaContainerId, command);
 
     Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
+    env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
     printContainerEnvironmentVariables(samzaContainerId, env);
 
     log.info("Samza FWK path: " + command + "; env=" + env);

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
new file mode 100644
index 0000000..002f365
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.webapp;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
+import org.apache.samza.job.yarn.YarnAppState;
+import org.apache.samza.job.yarn.YarnContainer;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responds to heartbeat requests from the containers with a {@link ContainerHeartbeatResponse}.
+ * The heartbeat request contains the <code> executionContainerId </code>
+ * which in YARN's case is the YARN container Id.
+ * This servlet validates the container Id against the list
+ * of running containers maintained in the {@link YarnAppState}.
+ * The returned {@link ContainerHeartbeatResponse#isAlive()} is
+ * <code> true </code> iff. the container Id exists in {@link YarnAppState#runningYarnContainers}.
+ */
+public class YarnContainerHeartbeatServlet extends HttpServlet {
+
+  private static final String YARN_CONTAINER_ID = "executionContainerId";
+  private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
+  private static final String APPLICATION_JSON = "application/json";
+  private static final String GROUP = SamzaAppMasterMetrics.class.getName();
+  private final Counter heartbeatsExpiredCount;
+
+  private YarnAppState yarnAppState;
+  private ObjectMapper mapper;
+
+  public YarnContainerHeartbeatServlet(YarnAppState yarnAppState, ReadableMetricsRegistry registry) {
+    this.yarnAppState = yarnAppState;
+    this.mapper = new ObjectMapper();
+    this.heartbeatsExpiredCount = registry.newCounter(GROUP, "heartbeats-expired");
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    ContainerId yarnContainerId;
+    PrintWriter printWriter = resp.getWriter();
+    String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+    ContainerHeartbeatResponse response;
+    resp.setContentType(APPLICATION_JSON);
+    boolean alive = false;
+    try {
+      yarnContainerId = ContainerId.fromString(containerIdParam);
+      for (YarnContainer yarnContainer : yarnAppState.runningYarnContainers.values()) {
+        if (yarnContainer.id().compareTo(yarnContainerId) == 0) {
+          alive = true;
+          break;
+        }
+      }
+      if (!alive) {
+        heartbeatsExpiredCount.inc();
+      }
+      response = new ContainerHeartbeatResponse(alive);
+      printWriter.write(mapper.writeValueAsString(response));
+    } catch (IllegalArgumentException e) {
+      LOG.error("Container ID {} passed is invalid", containerIdParam);
+      resp.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index 5f2bfc5..f436f79 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -28,7 +28,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.util.Logging
-import org.apache.samza.webapp.{ApplicationMasterWebServlet, ApplicationMasterRestServlet}
+import org.apache.samza.webapp.{ApplicationMasterRestServlet, ApplicationMasterWebServlet, YarnContainerHeartbeatServlet}
 
 /**
   * Samza's application master runs a very basic HTTP/JSON service to allow
@@ -56,6 +56,7 @@ class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationS
     webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
     webApp.start
 
+    samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
     samzaAppState.jobModelManager.start
     state.rpcUrl = rpcApp.getUrl
     state.trackingUrl = webApp.getUrl

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
new file mode 100644
index 0000000..d6fc254
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.webapp;
+
+import java.io.IOException;
+import java.net.URL;
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.yarn.YarnAppState;
+import org.apache.samza.job.yarn.YarnContainer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestYarnContainerHeartbeatServlet {
+
+  private YarnContainer container;
+  private YarnAppState yarnAppState;
+  private HttpServer webApp;
+  private ObjectMapper mapper;
+
+  private ContainerHeartbeatResponse heartbeat;
+
+  @Before
+  public void setup()
+      throws Exception {
+    container = mock(YarnContainer.class);
+    ReadableMetricsRegistry registry = new MetricsRegistryMap("test-registry");
+
+    yarnAppState =
+        new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 1);
+    webApp = new HttpServer("/", 0, "", new ServletHolder(new DefaultServlet()));
+    webApp.addServlet("/", new YarnContainerHeartbeatServlet(yarnAppState, registry));
+    webApp.start();
+    mapper = new ObjectMapper();
+  }
+
+  @After
+  public void cleanup()
+      throws Exception {
+    webApp.stop();
+  }
+
+  @Test
+  public void testContainerHeartbeatWhenValid()
+      throws IOException {
+    String VALID_CONTAINER_ID = "container_1350670447861_0003_01_000002";
+    when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID));
+    yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container);
+    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + VALID_CONTAINER_ID);
+    String response = Util.read(url, 1000);
+    heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
+    Assert.assertTrue(heartbeat.isAlive());
+  }
+
+  @Test
+  public void testContainerHeartbeatWhenInvalid()
+      throws IOException {
+    String VALID_CONTAINER_ID = "container_1350670447861_0003_01_000003";
+    String INVALID_CONTAINER_ID = "container_1350670447861_0003_01_000002";
+    when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID));
+    yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container);
+    URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + INVALID_CONTAINER_ID);
+    String response = Util.read(url, 1000);
+    heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
+    Assert.assertFalse(heartbeat.isAlive());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index 65c03d1..73c7f49 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -19,19 +19,20 @@
 
 package org.apache.samza.job.yarn
 
-import java.io.BufferedReader
+import java.io.{BufferedReader, InputStreamReader}
 import java.net.URL
-import java.io.InputStreamReader
+
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.clustermanager.SamzaApplicationState
-import org.apache.samza.config.MapConfig
-import org.junit.Assert._
-import org.junit.Test
-import scala.collection.JavaConverters._
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.metrics._
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 class TestSamzaYarnAppMasterService {
 
@@ -40,9 +41,10 @@ class TestSamzaYarnAppMasterService {
     val config = getDummyConfig
     val jobModelManager = JobModelManager(config)
     val samzaState = new SamzaApplicationState(jobModelManager)
+    val registry = new MetricsRegistryMap()
 
     val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
-    val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null)
+    val service = new SamzaYarnAppMasterService(config, samzaState, state, registry, null)
     val taskName = new TaskName("test")
 
     // start the dashboard
@@ -75,8 +77,9 @@ class TestSamzaYarnAppMasterService {
     val jobModelManager = JobModelManager(config)
     val samzaState = new SamzaApplicationState(jobModelManager)
     val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
+    val registry = new MetricsRegistryMap()
 
-    val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null)
+    val service = new SamzaYarnAppMasterService(config, samzaState, state, registry, null)
 
     // start the dashboard
     service.onInit