You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/05/10 23:58:18 UTC
samza git commit: SAMZA-871;
Heart-beat mechanism between JobCoordinator and all running containers
Repository: samza
Updated Branches:
refs/heads/master 46b1333c4 -> f6d0d6551
SAMZA-871; Heart-beat mechanism between JobCoordinator and all running containers
Author: Abhishek Shivanna <as...@linkedin.com>
Reviewers: Navina Ramesh <na...@apache.org>, Jagadish V <ja...@apache.org>
Closes #163 from abhishekshivanna/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f6d0d655
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f6d0d655
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f6d0d655
Branch: refs/heads/master
Commit: f6d0d65516aed4f37216dc44ab312cdf1e7b124a
Parents: 46b1333
Author: Abhishek Shivanna <as...@linkedin.com>
Authored: Wed May 10 16:58:14 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed May 10 16:58:14 2017 -0700
----------------------------------------------------------------------
.../versioned/container/metrics-table.html | 4 +
.../container/ContainerHeartbeatClient.java | 112 +++++++++++++++++++
.../container/ContainerHeartbeatMonitor.java | 75 +++++++++++++
.../container/ContainerHeartbeatResponse.java | 42 +++++++
.../samza/runtime/LocalContainerRunner.java | 43 +++++--
.../samza/config/ShellCommandConfig.scala | 5 +
.../samza/coordinator/JobModelManager.scala | 2 +-
.../main/scala/org/apache/samza/util/Util.scala | 2 +-
.../container/TestContainerHeartbeatClient.java | 81 ++++++++++++++
.../TestContainerHeartbeatMonitor.java | 63 +++++++++++
.../samza/job/yarn/YarnContainerRunner.java | 2 +
.../webapp/YarnContainerHeartbeatServlet.java | 92 +++++++++++++++
.../job/yarn/SamzaYarnAppMasterService.scala | 3 +-
.../TestYarnContainerHeartbeatServlet.java | 97 ++++++++++++++++
.../yarn/TestSamzaYarnAppMasterService.scala | 21 ++--
15 files changed, 624 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/docs/learn/documentation/versioned/container/metrics-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 8d425a2..2eb46e3 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -483,6 +483,10 @@
<td>job-healthy</td>
<td>State indicating whether the job is healthy or not</td>
</tr>
+ <tr>
+ <td>heartbeats-expired</td>
+ <td>Number of heartbeat requests from containers that are invalid</td>
+ </tr>
<tr>
<th colspan="2" class="section" id="kafka-system-consumer-metrics">org.apache.samza.system.kafka.KafkaSystemConsumerMetrics</th>
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
new file mode 100644
index 0000000..cc14948
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.stream.Collectors;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Issues a heartbeat to the coordinator and returns a
+ * {@link ContainerHeartbeatResponse}.
+ * Here's the description of the protocol between the
+ * container and the coordinator:
+ *
+ * The heartbeat request contains a <code> executionContainerId
+ * </code> that identifies the container from which the
+ * request is made. The coordinator validates the provided
+ * executionContainerId against its list of containers that should be
+ * running and returns a {@link ContainerHeartbeatResponse}.
+ *
+ * The returned {@link ContainerHeartbeatResponse#isAlive()} is
+ * <code> true </code> iff. the coordinator has determined
+ * that the container is valid and should continue running.
+ */
+public class ContainerHeartbeatClient {
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatClient.class);
+ private static final int NUM_RETRIES = 3;
+ private static final int TIMEOUT_MS = 5000;
+ private static final int BACKOFF_MULTIPLIER = 2;
+ private final String heartbeatEndpoint;
+
+ public ContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
+ this.heartbeatEndpoint =
+ String.format("%scontainerHeartbeat?executionContainerId=%s", coordinatorUrl, executionEnvContainerId);
+ }
+
+ /**
+ * Issues a heartbeat request to the coordinator and
+ * returns the corresponding {@link ContainerHeartbeatResponse}.
+ */
+ public ContainerHeartbeatResponse requestHeartbeat() {
+ ObjectMapper mapper = new ObjectMapper();
+ ContainerHeartbeatResponse response;
+ String reply = "";
+ try {
+ reply = httpGet(new URL(heartbeatEndpoint));
+ LOG.debug("Container Heartbeat got response {}", reply);
+ response = mapper.readValue(reply, ContainerHeartbeatResponse.class);
+ return response;
+ } catch (IOException e) {
+ LOG.error("Error in container heart beat protocol. Query url: {} response: {}", heartbeatEndpoint, reply);
+ }
+ response = new ContainerHeartbeatResponse(false);
+ return response;
+ }
+
+ String httpGet(URL url) throws IOException {
+ HttpURLConnection conn;
+ int delayMillis = 1000;
+
+ for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) {
+ conn = Util.getHttpConnection(url, TIMEOUT_MS);
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
+ if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ throw new IOException(String.format("HTTP error fetching url %s. Returned status code %d", url.toString(),
+ conn.getResponseCode()));
+ } else {
+ return br.lines().collect(Collectors.joining());
+ }
+ } catch (Exception e) {
+ LOG.error("Error in heartbeat request", e);
+ sleepUninterruptibly(delayMillis);
+ delayMillis = delayMillis * BACKOFF_MULTIPLIER;
+ }
+ }
+ throw new IOException(String.format("Error fetching url: %s. Tried %d time(s).", url.toString(), NUM_RETRIES));
+ }
+
+ private void sleepUninterruptibly(int delayMillis) {
+ try {
+ Thread.sleep(delayMillis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
new file mode 100644
index 0000000..940e80f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContainerHeartbeatMonitor {
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
+ private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
+ private static final int SCHEDULE_MS = 60000;
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+ private final Runnable onContainerExpired;
+ private final ContainerHeartbeatClient containerHeartbeatClient;
+ private boolean started = false;
+
+ public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
+ this.onContainerExpired = onContainerExpired;
+ this.containerHeartbeatClient = containerHeartbeatClient;
+ }
+
+ public void start() {
+ if (started) {
+ LOG.warn("Skipping attempt to start an already started ContainerHeartbeatMonitor.");
+ return;
+ }
+ LOG.info("Starting ContainerHeartbeatMonitor");
+ scheduler.scheduleAtFixedRate(() -> {
+ ContainerHeartbeatResponse response = containerHeartbeatClient.requestHeartbeat();
+ if (!response.isAlive()) {
+ onContainerExpired.run();
+ }
+ }, 0, SCHEDULE_MS, TimeUnit.MILLISECONDS);
+ started = true;
+ }
+
+ public void stop() {
+ if (started) {
+ LOG.info("Stopping ContainerHeartbeatMonitor");
+ scheduler.shutdown();
+ }
+ }
+
+ private static class HeartbeatThreadFactory implements ThreadFactory {
+ private static final String PREFIX = "Samza-" + ContainerHeartbeatMonitor.class.getSimpleName() + "-";
+ private static final AtomicInteger INSTANCE_NUM = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java
new file mode 100644
index 0000000..d402ef1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Used to represent the heartbeat response between
+ * the JobCoordinator and the containers.
+ * {@link ContainerHeartbeatResponse#isAlive()} is set to <code>true</code>
+ * iff. the heartbeat is valid.
+ */
+public class ContainerHeartbeatResponse {
+
+ private final boolean alive;
+
+ public ContainerHeartbeatResponse(@JsonProperty("alive") boolean alive) {
+ this.alive = alive;
+ }
+
+ public boolean isAlive() {
+ return alive;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index e02ee23..d690c80 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -25,6 +25,8 @@ import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatMonitor;
import org.apache.samza.container.SamzaContainer;
import org.apache.samza.container.SamzaContainer$;
import org.apache.samza.container.SamzaContainerExceptionHandler;
@@ -55,7 +57,9 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
private final JobModel jobModel;
private final String containerId;
- private volatile Throwable containerException = null;
+ private volatile Throwable containerRunnerException = null;
+ private ContainerHeartbeatMonitor containerHeartbeatMonitor;
+ private SamzaContainer container;
public LocalContainerRunner(JobModel jobModel, String containerId) {
super(jobModel.getConfig());
@@ -68,7 +72,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
ContainerModel containerModel = jobModel.getContainers().get(containerId);
Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
- SamzaContainer container = SamzaContainer$.MODULE$.apply(
+ container = SamzaContainer$.MODULE$.apply(
containerModel,
config,
jobModel.maxChangeLogStreamPartitions,
@@ -89,14 +93,14 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
@Override
public void onContainerFailed(Throwable t) {
log.info("Container Failed");
- containerException = t;
+ containerRunnerException = t;
}
});
-
+ startContainerHeartbeatMonitor();
container.run();
-
- if (containerException != null) {
- log.error("Container stopped with Exception. Exiting process now.", containerException);
+ stopContainerHeartbeatMonitor();
+ if (containerRunnerException != null) {
+ log.error("Container stopped with Exception. Exiting process now.", containerRunnerException);
System.exit(1);
}
}
@@ -137,6 +141,29 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
MDC.put("jobId", jobId);
StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
- new LocalContainerRunner(jobModel, containerId).run(streamApp);
+ LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
+ localContainerRunner.run(streamApp);
+ }
+
+ private void startContainerHeartbeatMonitor() {
+ String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+ String executionEnvContainerId = System.getenv(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID());
+ if (executionEnvContainerId != null) {
+ log.info("Got execution environment container id: {}", executionEnvContainerId);
+ containerHeartbeatMonitor = new ContainerHeartbeatMonitor(() -> {
+ container.shutdown();
+ containerRunnerException = new SamzaException("Container shutdown due to expired heartbeat");
+ }, new ContainerHeartbeatClient(coordinatorUrl, executionEnvContainerId));
+ containerHeartbeatMonitor.start();
+ } else {
+ containerHeartbeatMonitor = null;
+ log.warn("executionEnvContainerId not set. Container heartbeat monitor will not be started");
+ }
+ }
+
+ private void stopContainerHeartbeatMonitor() {
+ if (containerHeartbeatMonitor != null) {
+ containerHeartbeatMonitor.stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 3c0f320..caad7fd 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -45,6 +45,11 @@ object ShellCommandConfig {
*/
val ENV_JAVA_HOME = "JAVA_HOME"
+ /**
+ * The ID assigned to the container by the execution environment (eg: YARN Container Id)
+ */
+ val ENV_EXECUTION_ENV_CONTAINER_ID = "EXECUTION_ENV_CONTAINER_ID"
+
/*
* The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
* containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index e39ea3b..dda0b6b 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -156,7 +156,7 @@ object JobModelManager extends Logging {
jobModelRef.set(jobModel)
val server = new HttpServer
- server.addServlet("/*", new JobServlet(jobModelRef))
+ server.addServlet("/", new JobServlet(jobModelRef))
currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor)
currentJobModelManager
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index e7832a0..6c224e6 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -172,7 +172,7 @@ object Util extends Logging {
readStream(httpConn.getInputStream)
}
- private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+ def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
val conn = url.openConnection()
conn.setConnectTimeout(timeout)
conn.setReadTimeout(timeout)
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java
new file mode 100644
index 0000000..6dc07f8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatClient.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.io.IOException;
+import java.net.URL;
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class TestContainerHeartbeatClient {
+ private MockContainerHeartbeatClient client =
+ new MockContainerHeartbeatClient("http://fake-endpoint/", "FAKE_CONTAINER_ID");
+
+ @Test
+ public void testClientResponseForHeartbeatAlive()
+ throws IOException {
+ client.setHttpOutput("{\"alive\": true}");
+ ContainerHeartbeatResponse response = client.requestHeartbeat();
+ Assert.assertTrue(response.isAlive());
+ }
+
+ @Test
+ public void testClientResponseForHeartbeatDead()
+ throws IOException {
+ client.setHttpOutput("{\"alive\": false}");
+ ContainerHeartbeatResponse response = client.requestHeartbeat();
+ Assert.assertFalse(response.isAlive());
+ }
+
+ @Test
+ public void testClientResponseOnBadRequest()
+ throws IOException {
+ client.shouldThrowException(true);
+ ContainerHeartbeatResponse response = client.requestHeartbeat();
+ Assert.assertFalse(response.isAlive());
+ }
+
+ private class MockContainerHeartbeatClient extends ContainerHeartbeatClient {
+ private String httpOutput;
+ private boolean throwException = false;
+
+ public void shouldThrowException(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ public void setHttpOutput(String httpOutput) {
+ this.httpOutput = httpOutput;
+ }
+
+ MockContainerHeartbeatClient(String coordinatorUrl, String executionEnvContainerId) {
+ super(coordinatorUrl, executionEnvContainerId);
+ }
+
+ @Override
+ String httpGet(URL url)
+ throws IOException {
+ if (!throwException) {
+ return httpOutput;
+ } else {
+ throw new IOException("Exception thrown");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
new file mode 100644
index 0000000..829a158
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestContainerHeartbeatMonitor {
+
+ @Test
+ public void testCallbackWhenHeartbeatDead()
+ throws InterruptedException {
+ ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Runnable onExpired = () -> {
+ countDownLatch.countDown();
+ };
+ ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+ ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
+ when(mockClient.requestHeartbeat()).thenReturn(response);
+ monitor.start();
+ boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
+ Assert.assertTrue(success);
+ }
+
+ @Test
+ public void testDoesNotCallbackWhenHeartbeatAlive()
+ throws InterruptedException {
+ ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Runnable onExpired = () -> {
+ countDownLatch.countDown();
+ };
+ ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+ ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
+ when(client.requestHeartbeat()).thenReturn(response);
+ monitor.start();
+ boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
+ Assert.assertFalse(success);
+ Assert.assertEquals(1, countDownLatch.getCount());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
index 84ded62..cdcf2d1 100644
--- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
+++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.samza.clustermanager.SamzaContainerLaunchException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.YarnConfig;
import org.apache.samza.job.CommandBuilder;
import org.apache.samza.util.Util;
@@ -110,6 +111,7 @@ public class YarnContainerRunner {
log.info("Container ID {} using command {}", samzaContainerId, command);
Map<String, String> env = getEscapedEnvironmentVariablesMap(cmdBuilder);
+ env.put(ShellCommandConfig.ENV_EXECUTION_ENV_CONTAINER_ID(), Util.envVarEscape(container.getId().toString()));
printContainerEnvironmentVariables(samzaContainerId, env);
log.info("Samza FWK path: " + command + "; env=" + env);
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
new file mode 100644
index 0000000..002f365
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/webapp/YarnContainerHeartbeatServlet.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.webapp;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.job.yarn.SamzaAppMasterMetrics;
+import org.apache.samza.job.yarn.YarnAppState;
+import org.apache.samza.job.yarn.YarnContainer;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responds to heartbeat requests from the containers with a {@link ContainerHeartbeatResponse}.
+ * The heartbeat request contains the <code> executionContainerId </code>
+ * which in YARN's case is the YARN container Id.
+ * This servlet validates the container Id against the list
+ * of running containers maintained in the {@link YarnAppState}.
+ * The returned {@link ContainerHeartbeatResponse#isAlive()} is
+ * <code> true </code> iff. the container Id exists in {@link YarnAppState#runningYarnContainers}.
+ */
+public class YarnContainerHeartbeatServlet extends HttpServlet {
+
+ private static final String YARN_CONTAINER_ID = "executionContainerId";
+ private static final Logger LOG = LoggerFactory.getLogger(YarnContainerHeartbeatServlet.class);
+ private static final String APPLICATION_JSON = "application/json";
+ private static final String GROUP = SamzaAppMasterMetrics.class.getName();
+ private final Counter heartbeatsExpiredCount;
+
+ private YarnAppState yarnAppState;
+ private ObjectMapper mapper;
+
+ public YarnContainerHeartbeatServlet(YarnAppState yarnAppState, ReadableMetricsRegistry registry) {
+ this.yarnAppState = yarnAppState;
+ this.mapper = new ObjectMapper();
+ this.heartbeatsExpiredCount = registry.newCounter(GROUP, "heartbeats-expired");
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ ContainerId yarnContainerId;
+ PrintWriter printWriter = resp.getWriter();
+ String containerIdParam = req.getParameter(YARN_CONTAINER_ID);
+ ContainerHeartbeatResponse response;
+ resp.setContentType(APPLICATION_JSON);
+ boolean alive = false;
+ try {
+ yarnContainerId = ContainerId.fromString(containerIdParam);
+ for (YarnContainer yarnContainer : yarnAppState.runningYarnContainers.values()) {
+ if (yarnContainer.id().compareTo(yarnContainerId) == 0) {
+ alive = true;
+ break;
+ }
+ }
+ if (!alive) {
+ heartbeatsExpiredCount.inc();
+ }
+ response = new ContainerHeartbeatResponse(alive);
+ printWriter.write(mapper.writeValueAsString(response));
+ } catch (IllegalArgumentException e) {
+ LOG.error("Container ID {} passed is invalid", containerIdParam);
+ resp.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
index 5f2bfc5..f436f79 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala
@@ -28,7 +28,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
import org.apache.samza.coordinator.stream.messages.SetConfig
import org.apache.samza.metrics.ReadableMetricsRegistry
import org.apache.samza.util.Logging
-import org.apache.samza.webapp.{ApplicationMasterWebServlet, ApplicationMasterRestServlet}
+import org.apache.samza.webapp.{ApplicationMasterRestServlet, ApplicationMasterWebServlet, YarnContainerHeartbeatServlet}
/**
* Samza's application master runs a very basic HTTP/JSON service to allow
@@ -56,6 +56,7 @@ class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationS
webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state))
webApp.start
+ samzaAppState.jobModelManager.server.addServlet("/containerHeartbeat", new YarnContainerHeartbeatServlet(state, registry))
samzaAppState.jobModelManager.start
state.rpcUrl = rpcApp.getUrl
state.trackingUrl = webApp.getUrl
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
new file mode 100644
index 0000000..d6fc254
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.webapp;
+
+import java.io.IOException;
+import java.net.URL;
+import junit.framework.Assert;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.samza.container.ContainerHeartbeatClient;
+import org.apache.samza.container.ContainerHeartbeatResponse;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.job.yarn.YarnAppState;
+import org.apache.samza.job.yarn.YarnContainer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestYarnContainerHeartbeatServlet {
+
+ private YarnContainer container;
+ private YarnAppState yarnAppState;
+ private HttpServer webApp;
+ private ObjectMapper mapper;
+
+ private ContainerHeartbeatResponse heartbeat;
+
+ @Before
+ public void setup()
+ throws Exception {
+ container = mock(YarnContainer.class);
+ ReadableMetricsRegistry registry = new MetricsRegistryMap("test-registry");
+
+ yarnAppState =
+ new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "testHost", 1, 1);
+ webApp = new HttpServer("/", 0, "", new ServletHolder(new DefaultServlet()));
+ webApp.addServlet("/", new YarnContainerHeartbeatServlet(yarnAppState, registry));
+ webApp.start();
+ mapper = new ObjectMapper();
+ }
+
+ @After
+ public void cleanup()
+ throws Exception {
+ webApp.stop();
+ }
+
+ @Test
+ public void testContainerHeartbeatWhenValid()
+ throws IOException {
+ String VALID_CONTAINER_ID = "container_1350670447861_0003_01_000002";
+ when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID));
+ yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container);
+ URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + VALID_CONTAINER_ID);
+ String response = Util.read(url, 1000);
+ heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
+ Assert.assertTrue(heartbeat.isAlive());
+ }
+
+ @Test
+ public void testContainerHeartbeatWhenInvalid()
+ throws IOException {
+ String VALID_CONTAINER_ID = "container_1350670447861_0003_01_000003";
+ String INVALID_CONTAINER_ID = "container_1350670447861_0003_01_000002";
+ when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID));
+ yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container);
+ URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + INVALID_CONTAINER_ID);
+ String response = Util.read(url, 1000);
+ heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
+ Assert.assertFalse(heartbeat.isAlive());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f6d0d655/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index 65c03d1..73c7f49 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -19,19 +19,20 @@
package org.apache.samza.job.yarn
-import java.io.BufferedReader
+import java.io.{BufferedReader, InputStreamReader}
import java.net.URL
-import java.io.InputStreamReader
+
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.samza.clustermanager.SamzaApplicationState
-import org.apache.samza.config.MapConfig
-import org.junit.Assert._
-import org.junit.Test
-import scala.collection.JavaConverters._
-import org.apache.samza.config.Config
+import org.apache.samza.config.{Config, MapConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.JobModelManager
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
+import org.apache.samza.metrics._
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
class TestSamzaYarnAppMasterService {
@@ -40,9 +41,10 @@ class TestSamzaYarnAppMasterService {
val config = getDummyConfig
val jobModelManager = JobModelManager(config)
val samzaState = new SamzaApplicationState(jobModelManager)
+ val registry = new MetricsRegistryMap()
val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
- val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null)
+ val service = new SamzaYarnAppMasterService(config, samzaState, state, registry, null)
val taskName = new TaskName("test")
// start the dashboard
@@ -75,8 +77,9 @@ class TestSamzaYarnAppMasterService {
val jobModelManager = JobModelManager(config)
val samzaState = new SamzaApplicationState(jobModelManager)
val state = new YarnAppState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "testHost", 1, 1);
+ val registry = new MetricsRegistryMap()
- val service = new SamzaYarnAppMasterService(config, samzaState, state, null, null)
+ val service = new SamzaYarnAppMasterService(config, samzaState, state, registry, null)
// start the dashboard
service.onInit