You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/03/30 02:47:54 UTC

samza git commit: SAMZA-850 : Yarn Job Validation Tool

Repository: samza
Updated Branches:
  refs/heads/master 9d6831bd1 -> baf9faa1c


SAMZA-850 : Yarn Job Validation Tool


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/baf9faa1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/baf9faa1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/baf9faa1

Branch: refs/heads/master
Commit: baf9faa1cc6170a89ea336326f85b2678772eccd
Parents: 9d6831b
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Tue Mar 29 16:30:44 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Tue Mar 29 16:30:44 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../apache/samza/metrics/MetricsAccessor.java   |  54 ++++++
 .../MetricsValidationFailureException.java      |  32 ++++
 .../apache/samza/metrics/MetricsValidator.java  |  48 +++++
 .../org/apache/samza/job/model/JobModel.java    |  14 ++
 .../samza/metrics/JmxMetricsAccessor.java       |  93 +++++++++
 .../java/org/apache/samza/metrics/JmxUtil.java  |  59 ++++++
 .../samza/metrics/reporter/JmxReporter.scala    |  26 +--
 .../samza/metrics/TestJmxMetricsAccessor.java   |  93 +++++++++
 samza-shell/src/main/bash/validate-yarn-job.sh  |  21 +++
 .../samza/validation/YarnJobValidationTool.java | 189 +++++++++++++++++++
 .../samza/validation/MockMetricsValidator.java  |  50 +++++
 .../validation/TestYarnJobValidationTool.java   | 142 ++++++++++++++
 13 files changed, 797 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 53cb8b4..b5bd365 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -104,6 +104,7 @@
     <subpackage name="metrics">
         <allow pkg="org.apache.samza.config" />
         <allow pkg="org.apache.samza.util" />
+        <allow pkg="org.apache.samza.container" />
     </subpackage>
 
     <subpackage name="task">

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java
new file mode 100644
index 0000000..8bd75cd
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.util.Map;
+
+
+/**
+ * A MetricsAccessor allows users to retrieve metric values, based on group name and metric name,
+ * though specific metrics system, such as JMX.
+ */
+public interface MetricsAccessor {
+  /**
+   * Get the values of a counter
+   * @param group Group for the counter, e.g. org.apache.samza.container.SamzaContainerMetrics
+   * @param counter Name of the counter, e.g. commit-calls
+   * @return A map of counter values, keyed by type, e.g. {"samza-container-0": 100L}
+   */
+  Map<String, Long> getCounterValues(String group, String counter);
+
+  /**
+   * Get the values of a gauge
+   * @param group Group for the gauge, e.g. org.apache.samza.container.SamzaContainerMetrics
+   * @param gauge Name of the gauge, e.g. event-loop-utilization
+   * @param <T> Type of the gauge value, e.g. Double
+   * @return A map of gauge values, keyed by type, e.g. {"samza-container-0": 0.8}
+   */
+  <T> Map<String, T> getGaugeValues(String group, String gauge);
+
+  /**
+   * Get the values of a timer
+   * @param group Group for the timer, e.g. org.apache.samza.container.SamzaContainerMetrics
+   * @param timer Name of the timer, e.g. choose-ns
+   * @return A map of timer values, keyed by type, e.g. {"samza-container-0": 10.5}
+   */
+  Map<String, Double> getTimerValues(String group, String timer);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
new file mode 100644
index 0000000..00c96f3
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+/**
+ * Thrown when the metrics validation fails. See {@link org.apache.samza.metrics.MetricsValidator}.
+ */
+public class MetricsValidationFailureException extends Exception {
+  public MetricsValidationFailureException(String message) {
+    super(message);
+  }
+  public MetricsValidationFailureException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java
new file mode 100644
index 0000000..27e7a1c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * A MetricsValidator reads the job's metrics values by using the {@link org.apache.samza.metrics.MetricsAccessor},
+ * and validate them.
+ */
+public interface MetricsValidator {
+  /**
+   * Initialize with config.
+   * @param config Job config
+   */
+  void init(Config config);
+
+  /**
+   * Validate the metrics values of a job
+   * @param accessor Accessor to get the metrics values through specific metrics system, e.g. JMX.
+   * @throws MetricsValidationFailureException Exception when the validation fails.
+   */
+  void validate(MetricsAccessor accessor) throws MetricsValidationFailureException;
+
+  /**
+   * Complete validation. Final checks can be performed here.
+   * @throws MetricsValidationFailureException Exception when the validation fails.
+   */
+  void complete() throws MetricsValidationFailureException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index 9445a30..dbd6dcc 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -103,6 +103,20 @@ public class JobModel {
     return mappings.get(key);
   }
 
+  public Map<Integer, String> getAllContainerToHostValues(String key) {
+    if (localityManager == null) {
+      return Collections.EMPTY_MAP;
+    }
+    Map<Integer, String> allValues = new HashMap<>();
+    for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
+      String value = entry.getValue().get(key);
+      if (value != null) {
+        allValues.put(entry.getKey(), value);
+      }
+    }
+    return allValues;
+  }
+
   private void populateContainerLocalityMappings() {
     Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
     for (Integer containerId: containers.keySet()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java b/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java
new file mode 100644
index 0000000..1581d12
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * JMX metrics accessor.
+ * It connects to a container JMX url,and get metrics values by querying the MBeans.
+ */
+public class JmxMetricsAccessor implements MetricsAccessor {
+  private static final Logger log = LoggerFactory.getLogger(JmxMetricsAccessor.class);
+
+  private final String url;
+  private JMXConnector jmxc;
+
+  public JmxMetricsAccessor(String url) {
+    this.url = url;
+  }
+
+  public void connect() throws IOException {
+    JMXServiceURL jmxUrl = new JMXServiceURL(url);
+    jmxc = JMXConnectorFactory.connect(jmxUrl, null);
+  }
+
+  public void close() throws IOException {
+    jmxc.close();
+  }
+
+  private <T> Map<String, T> getMetricValues(String group, String metric, String attribute) {
+    try {
+      StringBuilder nameBuilder = new StringBuilder();
+      nameBuilder.append(JmxUtil.makeNameJmxSafe(group));
+      nameBuilder.append(":type=*,name=");
+      nameBuilder.append(JmxUtil.makeNameJmxSafe(metric));
+      ObjectName query = new ObjectName(nameBuilder.toString());
+      Map<String, T> values = new HashMap<>();
+      MBeanServerConnection conn = jmxc.getMBeanServerConnection();
+      for (ObjectName objName : conn.queryNames(query, null)) {
+        String type = objName.getKeyProperty("type");
+        T val = (T) conn.getAttribute(objName, attribute);
+        values.put(type, val);
+      }
+      return values;
+    } catch (Exception e) {
+      log.error(e.getMessage(), e);
+      return Collections.EMPTY_MAP;
+    }
+  }
+
+  @Override
+  public Map<String, Long> getCounterValues(String group, String counter) {
+    return getMetricValues(group, counter, "Count");
+  }
+
+  @Override
+  public <T> Map<String, T> getGaugeValues(String group, String gauge) {
+    return getMetricValues(group, gauge, "Value");
+  }
+
+  @Override
+  public Map<String, Double> getTimerValues(String group, String timer) {
+    return getMetricValues(group, timer, "AverageTime");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java b/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
new file mode 100644
index 0000000..6080f7c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.metrics;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to create JMX related objects
+ */
+public class JmxUtil {
+  private static final Logger log = LoggerFactory.getLogger(JmxUtil.class);
+
+  public static ObjectName getObjectName(String group, String name, String t) throws MalformedObjectNameException {
+    StringBuilder nameBuilder = new StringBuilder();
+    nameBuilder.append(makeNameJmxSafe(group));
+    nameBuilder.append(":type=");
+    nameBuilder.append(makeNameJmxSafe(t));
+    nameBuilder.append(",name=");
+    nameBuilder.append(makeNameJmxSafe(name));
+    ObjectName objName = new ObjectName(nameBuilder.toString());
+    log.debug("Resolved name for " + group + ", " + name + ", " + t + " to: " + objName);
+    return objName;
+  }
+
+  /*
+   * JMX only has ObjectName.quote, which is pretty nasty looking. This
+   * function escapes without quoting, using the rules outlined in:
+   * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
+   */
+  public static String makeNameJmxSafe(String str) {
+    return str
+      .replace(",", "_")
+      .replace("=", "_")
+      .replace(":", "_")
+      .replace("\"", "_")
+      .replace("*", "_")
+      .replace("?", "_");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index e966102..63123ff 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -33,6 +33,7 @@ import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.metrics.ReadableMetricsRegistryListener
 import scala.collection.JavaConversions._
 import org.apache.samza.metrics.MetricsVisitor
+import org.apache.samza.metrics.JmxUtil._
 
 class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
   var sources = Map[ReadableMetricsRegistry, String]()
@@ -84,31 +85,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
     }
   }
 
-  def getObjectName(group: String, name: String, t: String) = {
-    val nameBuilder = new StringBuilder
-    nameBuilder.append(makeNameJmxSafe(group))
-    nameBuilder.append(":type=")
-    nameBuilder.append(makeNameJmxSafe(t))
-    nameBuilder.append(",name=")
-    nameBuilder.append(makeNameJmxSafe(name))
-    val objName = new ObjectName(nameBuilder.toString)
-    debug("Resolved name for %s, %s, %s to: %s" format (group, name, t, objName))
-    objName
-  }
-
-  /*
-   * JMX only has ObjectName.quote, which is pretty nasty looking. This 
-   * function escapes without quoting, using the rules outlined in: 
-   * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
-   */
-  def makeNameJmxSafe(str: String) = str
-    .replace(",", "_")
-    .replace("=", "_")
-    .replace(":", "_")
-    .replace("\"", "_")
-    .replace("*", "_")
-    .replace("?", "_")
-
   def registerBean(bean: MetricMBean) {
     if (!server.isRegistered(bean.objectName)) {
       debug("Registering MBean for %s." format bean.objectName)

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java b/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java
new file mode 100644
index 0000000..5de2cc7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.QueryExp;
+import javax.management.remote.JMXConnector;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestJmxMetricsAccessor {
+  private JmxMetricsAccessor jmxMetricsAccessor;
+  private Set<ObjectName> objectNames;
+  private MBeanServerConnection conn;
+
+  @Before
+  public void setup() throws Exception {
+    jmxMetricsAccessor = new JmxMetricsAccessor("dummyurl");
+    JMXConnector jmxc = mock(JMXConnector.class);
+    conn = mock(MBeanServerConnection.class);
+    when(jmxc.getMBeanServerConnection()).thenReturn(conn);
+    objectNames = new HashSet<>();
+    when(conn.queryNames(any(ObjectName.class), any(QueryExp.class))).thenReturn(objectNames);
+    Field jmxcField = JmxMetricsAccessor.class.getDeclaredField("jmxc");
+    jmxcField.setAccessible(true);
+    jmxcField.set(jmxMetricsAccessor, jmxc);
+  }
+
+  @Test
+  public void testGetCounterValues() throws Exception {
+    ObjectName counterObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "commit-calls", "samza-container-0");
+    objectNames.add(counterObject);
+    Long commitCalls = 100L;
+    when(conn.getAttribute(counterObject, "Count")).thenReturn(commitCalls);
+
+    Map<String, Long> result = jmxMetricsAccessor.getCounterValues(SamzaContainerMetrics.class.getName(),
+        "commit-calls");
+    assertTrue(result.size() == 1);
+    assertTrue(result.get("samza-container-0").equals(commitCalls));
+  }
+
+  @Test
+  public void testGetGaugeValues() throws Exception {
+    ObjectName gaugeObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "event-loop-utilization", "samza-container-1");
+    objectNames.add(gaugeObject);
+    Double loopUtil = 0.8;
+    when(conn.getAttribute(gaugeObject, "Value")).thenReturn(loopUtil);
+
+    Map<String, Double> result = jmxMetricsAccessor.getGaugeValues(SamzaContainerMetrics.class.getName(), "event-loop-utilization");
+    assertTrue(result.size() == 1);
+    assertTrue(result.get("samza-container-1").equals(loopUtil));
+  }
+
+  @Test
+  public void testGetTimerValues() throws Exception {
+    ObjectName timerObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "choose-ns", "samza-container-2");
+    objectNames.add(timerObject);
+    Double time = 42.42;
+    when(conn.getAttribute(timerObject, "AverageTime")).thenReturn(time);
+
+    Map<String, Double> result = jmxMetricsAccessor.getTimerValues(SamzaContainerMetrics.class.getName(), "choose-ns");
+    assertTrue(result.size() == 1);
+    assertTrue(result.get("samza-container-2").equals(time));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-shell/src/main/bash/validate-yarn-job.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/validate-yarn-job.sh b/samza-shell/src/main/bash/validate-yarn-job.sh
new file mode 100644
index 0000000..8273a32
--- /dev/null
+++ b/samza-shell/src/main/bash/validate-yarn-job.sh
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.validation.YarnJobValidationTool "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
new file mode 100644
index 0000000..70f1e4f
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.validation;
+
+import java.util.Map;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.yarn.ClientHelper;
+import org.apache.samza.metrics.JmxMetricsAccessor;
+import org.apache.samza.metrics.MetricsValidator;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command-line tool for validating the status of a Yarn job.
+ * It checks the job has been successfully submitted to the Yarn cluster, the status of
+ * the application attempt is running and the running container count matches the expectation.
+ * It also supports an optional MetricsValidator plugin through arguments so job metrics can
+ * be validated too using JMX. This tool can be used, for example, as an automated validation
+ * step after starting a job.
+ *
+ * When running this tool, please provide the configuration URI of job. For example:
+ *
+ * deploy/samza/bin/validate-yarn-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties [--metrics-validator=com.foo.bar.SomeMetricsValidator]
+ *
+ * The tool prints out the validation result in each step and throws an exception when the
+ * validation fails.
+ */
+public class YarnJobValidationTool {
+  private static final Logger log = LoggerFactory.getLogger(YarnJobValidationTool.class);
+
+  private final JobConfig config;
+  private final YarnClient client;
+  private final String jobName;
+  private final MetricsValidator validator;
+
+  public YarnJobValidationTool(JobConfig config, YarnClient client, MetricsValidator validator) {
+    this.config = config;
+    this.client = client;
+    String name = this.config.getName().get();
+    String jobId = this.config.getJobId().nonEmpty()? this.config.getJobId().get() : "1";
+    this.jobName =  name + "_" + jobId;
+    this.validator = validator;
+  }
+
+  public void run() {
+    ApplicationId appId;
+    ApplicationAttemptId attemptId;
+
+    try {
+      log.info("Start validating job " + this.jobName);
+
+      appId = validateAppId();
+      attemptId = validateRunningAttemptId(appId);
+      validateContainerCount(attemptId);
+      if(validator != null) {
+        validateJmxMetrics();
+      }
+
+      log.info("End of validation");
+    } catch (Exception e) {
+      log.error(e.getMessage(), e);
+      System.exit(1);
+    }
+  }
+
+  public ApplicationId validateAppId() throws Exception {
+    // fetch only the last created application with the job name and id
+    // i.e. get the application with max appId
+    ApplicationId appId = null;
+    for(ApplicationReport applicationReport : this.client.getApplications()) {
+      if(applicationReport.getName().equals(this.jobName)) {
+        ApplicationId id = applicationReport.getApplicationId();
+        if(appId == null || appId.compareTo(id) < 0) {
+          appId = id;
+        }
+      }
+    }
+    if (appId != null) {
+      log.info("Job lookup success. ApplicationId " + appId.toString());
+      return appId;
+    } else {
+      throw new SamzaException("Job lookup failure " + this.jobName);
+    }
+  }
+
+  public ApplicationAttemptId validateRunningAttemptId(ApplicationId appId) throws Exception {
+    ApplicationAttemptId attemptId = this.client.getApplicationReport(appId).getCurrentApplicationAttemptId();
+    ApplicationAttemptReport attemptReport = this.client.getApplicationAttemptReport(attemptId);
+    if (attemptReport.getYarnApplicationAttemptState() == YarnApplicationAttemptState.RUNNING) {
+      log.info("Job is running. AttempId " + attemptId.toString());
+      return attemptId;
+    } else {
+      throw new SamzaException("Job not running " + this.jobName);
+    }
+  }
+
+  public int validateContainerCount(ApplicationAttemptId attemptId) throws Exception {
+    int runningContainerCount = 0;
+    for(ContainerReport containerReport : this.client.getContainers(attemptId)) {
+      if(containerReport.getContainerState() == ContainerState.RUNNING) {
+        ++runningContainerCount;
+      }
+    }
+    // expected containers to be the configured job containers plus the AppMaster container
+    int containerExpected = this.config.getContainerCount() + 1;
+
+    if (runningContainerCount == containerExpected) {
+      log.info("Container count matches. " + runningContainerCount + " containers are running.");
+      return runningContainerCount;
+    } else {
+      throw new SamzaException("Container count does not match. " + runningContainerCount + " containers are running, while " + containerExpected + " is expected.");
+    }
+  }
+
+  public void validateJmxMetrics() throws Exception {
+    JobCoordinator jobCoordinator = JobCoordinator.apply(config);
+    validator.init(config);
+    Map<Integer, String> jmxUrls = jobCoordinator.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+    for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) {
+      Integer containerId = entry.getKey();
+      String jmxUrl = entry.getValue();
+      log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
+      JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+      jmxMetrics.connect();
+      validator.validate(jmxMetrics);
+      jmxMetrics.close();
+      log.info("validate container " + containerId + " successfully");
+    }
+    validator.complete();
+  }
+
+  public static void main(String [] args) throws Exception {
+    CommandLine cmdline = new CommandLine();
+    OptionParser parser = cmdline.parser();
+    OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.")
+                                            .withOptionalArg()
+                                            .ofType(String.class).describedAs("com.foo.bar.ClassName");
+    OptionSet options = cmdline.parser().parse(args);
+    Config config = cmdline.loadConfig(options);
+    MetricsValidator validator = null;
+    if (options.has(validatorOpt)) {
+      String validatorClass = options.valueOf(validatorOpt);
+      validator = (MetricsValidator) Class.forName(validatorClass).newInstance();
+    }
+
+    YarnConfiguration hadoopConfig = new YarnConfiguration();
+    hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName());
+    hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName());
+    ClientHelper clientHelper = new ClientHelper(hadoopConfig);
+
+    new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java b/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java
new file mode 100644
index 0000000..c3cf935
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.validation;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.metrics.MetricsAccessor;
+import org.apache.samza.metrics.MetricsValidationFailureException;
+import org.apache.samza.metrics.MetricsValidator;
+
+
+public class MockMetricsValidator implements MetricsValidator {
+
+  @Override
+  public void init(Config config) {
+  }
+
+  @Override
+  public void validate(MetricsAccessor accessor) throws MetricsValidationFailureException {
+    Map<String, Long> commitCalls = accessor.getCounterValues(SamzaContainerMetrics.class.getName(), "commit-calls");
+    if(commitCalls.isEmpty()) throw new MetricsValidationFailureException("no value");
+    for(Map.Entry<String, Long> entry: commitCalls.entrySet()) {
+      if(entry.getValue() <= 0) {
+        throw new MetricsValidationFailureException("commit call <= 0");
+      }
+    }
+  }
+
+  @Override
+  public void complete() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java b/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
new file mode 100644
index 0000000..7a8d291
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.validation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.metrics.JmxMetricsAccessor;
+import org.apache.samza.metrics.MetricsValidationFailureException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.*;
+
+public class TestYarnJobValidationTool {
+  private YarnClient client;
+  private YarnJobValidationTool tool;
+  private String jobName = "test";
+  private int jobId = 1;
+  private ApplicationId appId;
+  ApplicationAttemptId attemptId;
+  private int containerCount = 9;
+  private Config config = new MapConfig(new HashMap<String, String>() {
+    {
+      put("job.name", jobName);
+      put("job.id", String.valueOf(jobId));
+      put("yarn.container.count", String.valueOf(containerCount));
+    }
+  });
+  private MockMetricsValidator validator = new MockMetricsValidator();
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  @Before
+  public void setup() throws Exception {
+    client = mock(YarnClient.class);
+    tool = new YarnJobValidationTool(new JobConfig(config), client, validator);
+    appId = mock(ApplicationId.class);
+    when(appId.getId()).thenReturn(1111);
+    attemptId = mock(ApplicationAttemptId.class);
+    when(attemptId.getApplicationId()).thenReturn(appId);
+    when(attemptId.getAttemptId()).thenReturn(2222);
+  }
+
+  @Test
+  public void testValidateAppId() throws Exception {
+    ApplicationReport appReport = mock(ApplicationReport.class);
+    when(appReport.getName()).thenReturn(jobName + "_" + jobId);
+    when(appReport.getApplicationId()).thenReturn(appId);
+    when(client.getApplications()).thenReturn(Collections.singletonList(appReport));
+    assertTrue(tool.validateAppId().equals(appId));
+
+    when(appReport.getName()).thenReturn("dummy");
+    exception.expect(SamzaException.class);
+    tool.validateAppId();
+  }
+
+  @Test
+  public void testValidateRunningAttemptId() throws Exception {
+    ApplicationReport appReport = mock(ApplicationReport.class);
+    when(client.getApplicationReport(appId)).thenReturn(appReport);
+    when(appReport.getCurrentApplicationAttemptId()).thenReturn(attemptId);
+    ApplicationAttemptReport attemptReport = mock(ApplicationAttemptReport.class);
+    when(attemptReport.getYarnApplicationAttemptState()).thenReturn(YarnApplicationAttemptState.RUNNING);
+    when(attemptReport.getApplicationAttemptId()).thenReturn(attemptId);
+    when(client.getApplicationAttemptReport(attemptId)).thenReturn(attemptReport);
+    assertTrue(tool.validateRunningAttemptId(appId).equals(attemptId));
+
+    when(attemptReport.getYarnApplicationAttemptState()).thenReturn(YarnApplicationAttemptState.FAILED);
+    exception.expect(SamzaException.class);
+    tool.validateRunningAttemptId(appId);
+  }
+
+  @Test
+  public void testValidateContainerCount() throws Exception {
+    List<ContainerReport> containerReports = new ArrayList<>();
+    for (int i = 0; i <= containerCount; i++) {
+      ContainerReport report = mock(ContainerReport.class);
+      when(report.getContainerState()).thenReturn(ContainerState.RUNNING);
+      containerReports.add(report);
+    }
+    when(client.getContainers(attemptId)).thenReturn(containerReports);
+    assertTrue(tool.validateContainerCount(attemptId) == (containerCount + 1));
+
+    containerReports.remove(0);
+    exception.expect(SamzaException.class);
+    tool.validateContainerCount(attemptId);
+  }
+
+  @Test
+  public void testValidateJmxMetrics() throws MetricsValidationFailureException {
+    JmxMetricsAccessor jmxMetricsAccessor = mock(JmxMetricsAccessor.class);
+    Map<String, Long> values = new HashMap<>();
+    values.put("samza-container-0", 100L);
+    when(jmxMetricsAccessor.getCounterValues(SamzaContainerMetrics.class.getName(), "commit-calls")).thenReturn(values);
+    validator.validate(jmxMetricsAccessor);
+
+    values.put("samza-container-0", -1L);
+    // the mock validator will fail if the commit-calls are less than or equal to 0
+    exception.expect(MetricsValidationFailureException.class);
+    validator.validate(jmxMetricsAccessor);
+  }
+}