You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/03/30 02:47:54 UTC
samza git commit: SAMZA-850 : Yarn Job Validation Tool
Repository: samza
Updated Branches:
refs/heads/master 9d6831bd1 -> baf9faa1c
SAMZA-850 : Yarn Job Validation Tool
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/baf9faa1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/baf9faa1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/baf9faa1
Branch: refs/heads/master
Commit: baf9faa1cc6170a89ea336326f85b2678772eccd
Parents: 9d6831b
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Tue Mar 29 16:30:44 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Tue Mar 29 16:30:44 2016 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 +
.../apache/samza/metrics/MetricsAccessor.java | 54 ++++++
.../MetricsValidationFailureException.java | 32 ++++
.../apache/samza/metrics/MetricsValidator.java | 48 +++++
.../org/apache/samza/job/model/JobModel.java | 14 ++
.../samza/metrics/JmxMetricsAccessor.java | 93 +++++++++
.../java/org/apache/samza/metrics/JmxUtil.java | 59 ++++++
.../samza/metrics/reporter/JmxReporter.scala | 26 +--
.../samza/metrics/TestJmxMetricsAccessor.java | 93 +++++++++
samza-shell/src/main/bash/validate-yarn-job.sh | 21 +++
.../samza/validation/YarnJobValidationTool.java | 189 +++++++++++++++++++
.../samza/validation/MockMetricsValidator.java | 50 +++++
.../validation/TestYarnJobValidationTool.java | 142 ++++++++++++++
13 files changed, 797 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 53cb8b4..b5bd365 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -104,6 +104,7 @@
<subpackage name="metrics">
<allow pkg="org.apache.samza.config" />
<allow pkg="org.apache.samza.util" />
+ <allow pkg="org.apache.samza.container" />
</subpackage>
<subpackage name="task">
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java
new file mode 100644
index 0000000..8bd75cd
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.util.Map;
+
+
+/**
+ * A MetricsAccessor allows users to retrieve metric values, based on group name and metric name,
+ * though specific metrics system, such as JMX.
+ */
+public interface MetricsAccessor {
+ /**
+ * Get the values of a counter
+ * @param group Group for the counter, e.g. org.apache.samza.container.SamzaContainerMetrics
+ * @param counter Name of the counter, e.g. commit-calls
+ * @return A map of counter values, keyed by type, e.g. {"samza-container-0": 100L}
+ */
+ Map<String, Long> getCounterValues(String group, String counter);
+
+ /**
+ * Get the values of a gauge
+ * @param group Group for the gauge, e.g. org.apache.samza.container.SamzaContainerMetrics
+ * @param gauge Name of the gauge, e.g. event-loop-utilization
+ * @param <T> Type of the gauge value, e.g. Double
+ * @return A map of gauge values, keyed by type, e.g. {"samza-container-0": 0.8}
+ */
+ <T> Map<String, T> getGaugeValues(String group, String gauge);
+
+ /**
+ * Get the values of a timer
+ * @param group Group for the timer, e.g. org.apache.samza.container.SamzaContainerMetrics
+ * @param timer Name of the timer, e.g. choose-ns
+ * @return A map of timer values, keyed by type, e.g. {"samza-container-0": 10.5}
+ */
+ Map<String, Double> getTimerValues(String group, String timer);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
new file mode 100644
index 0000000..00c96f3
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+/**
+ * Thrown when the metrics validation fails. See {@link org.apache.samza.metrics.MetricsValidator}.
+ */
+public class MetricsValidationFailureException extends Exception {
+ public MetricsValidationFailureException(String message) {
+ super(message);
+ }
+ public MetricsValidationFailureException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java
new file mode 100644
index 0000000..27e7a1c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * A MetricsValidator reads the job's metrics values by using the {@link org.apache.samza.metrics.MetricsAccessor},
+ * and validate them.
+ */
+public interface MetricsValidator {
+ /**
+ * Initialize with config.
+ * @param config Job config
+ */
+ void init(Config config);
+
+ /**
+ * Validate the metrics values of a job
+ * @param accessor Accessor to get the metrics values through specific metrics system, e.g. JMX.
+ * @throws MetricsValidationFailureException Exception when the validation fails.
+ */
+ void validate(MetricsAccessor accessor) throws MetricsValidationFailureException;
+
+ /**
+ * Complete validation. Final checks can be performed here.
+ * @throws MetricsValidationFailureException Exception when the validation fails.
+ */
+ void complete() throws MetricsValidationFailureException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index 9445a30..dbd6dcc 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -103,6 +103,20 @@ public class JobModel {
return mappings.get(key);
}
+ public Map<Integer, String> getAllContainerToHostValues(String key) {
+ if (localityManager == null) {
+ return Collections.EMPTY_MAP;
+ }
+ Map<Integer, String> allValues = new HashMap<>();
+ for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) {
+ String value = entry.getValue().get(key);
+ if (value != null) {
+ allValues.put(entry.getKey(), value);
+ }
+ }
+ return allValues;
+ }
+
private void populateContainerLocalityMappings() {
Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality();
for (Integer containerId: containers.keySet()) {
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java b/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java
new file mode 100644
index 0000000..1581d12
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * JMX metrics accessor.
+ * It connects to a container JMX url,and get metrics values by querying the MBeans.
+ */
+public class JmxMetricsAccessor implements MetricsAccessor {
+ private static final Logger log = LoggerFactory.getLogger(JmxMetricsAccessor.class);
+
+ private final String url;
+ private JMXConnector jmxc;
+
+ public JmxMetricsAccessor(String url) {
+ this.url = url;
+ }
+
+ public void connect() throws IOException {
+ JMXServiceURL jmxUrl = new JMXServiceURL(url);
+ jmxc = JMXConnectorFactory.connect(jmxUrl, null);
+ }
+
+ public void close() throws IOException {
+ jmxc.close();
+ }
+
+ private <T> Map<String, T> getMetricValues(String group, String metric, String attribute) {
+ try {
+ StringBuilder nameBuilder = new StringBuilder();
+ nameBuilder.append(JmxUtil.makeNameJmxSafe(group));
+ nameBuilder.append(":type=*,name=");
+ nameBuilder.append(JmxUtil.makeNameJmxSafe(metric));
+ ObjectName query = new ObjectName(nameBuilder.toString());
+ Map<String, T> values = new HashMap<>();
+ MBeanServerConnection conn = jmxc.getMBeanServerConnection();
+ for (ObjectName objName : conn.queryNames(query, null)) {
+ String type = objName.getKeyProperty("type");
+ T val = (T) conn.getAttribute(objName, attribute);
+ values.put(type, val);
+ }
+ return values;
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ return Collections.EMPTY_MAP;
+ }
+ }
+
+ @Override
+ public Map<String, Long> getCounterValues(String group, String counter) {
+ return getMetricValues(group, counter, "Count");
+ }
+
+ @Override
+ public <T> Map<String, T> getGaugeValues(String group, String gauge) {
+ return getMetricValues(group, gauge, "Value");
+ }
+
+ @Override
+ public Map<String, Double> getTimerValues(String group, String timer) {
+ return getMetricValues(group, timer, "AverageTime");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java b/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
new file mode 100644
index 0000000..6080f7c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java
@@ -0,0 +1,59 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.metrics;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to create JMX related objects
+ */
+public class JmxUtil {
+ private static final Logger log = LoggerFactory.getLogger(JmxUtil.class);
+
+ public static ObjectName getObjectName(String group, String name, String t) throws MalformedObjectNameException {
+ StringBuilder nameBuilder = new StringBuilder();
+ nameBuilder.append(makeNameJmxSafe(group));
+ nameBuilder.append(":type=");
+ nameBuilder.append(makeNameJmxSafe(t));
+ nameBuilder.append(",name=");
+ nameBuilder.append(makeNameJmxSafe(name));
+ ObjectName objName = new ObjectName(nameBuilder.toString());
+ log.debug("Resolved name for " + group + ", " + name + ", " + t + " to: " + objName);
+ return objName;
+ }
+
+ /*
+ * JMX only has ObjectName.quote, which is pretty nasty looking. This
+ * function escapes without quoting, using the rules outlined in:
+ * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
+ */
+ public static String makeNameJmxSafe(String str) {
+ return str
+ .replace(",", "_")
+ .replace("=", "_")
+ .replace(":", "_")
+ .replace("\"", "_")
+ .replace("*", "_")
+ .replace("?", "_");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
index e966102..63123ff 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
@@ -33,6 +33,7 @@ import org.apache.samza.metrics.ReadableMetricsRegistry
import org.apache.samza.metrics.ReadableMetricsRegistryListener
import scala.collection.JavaConversions._
import org.apache.samza.metrics.MetricsVisitor
+import org.apache.samza.metrics.JmxUtil._
class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
var sources = Map[ReadableMetricsRegistry, String]()
@@ -84,31 +85,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
}
}
- def getObjectName(group: String, name: String, t: String) = {
- val nameBuilder = new StringBuilder
- nameBuilder.append(makeNameJmxSafe(group))
- nameBuilder.append(":type=")
- nameBuilder.append(makeNameJmxSafe(t))
- nameBuilder.append(",name=")
- nameBuilder.append(makeNameJmxSafe(name))
- val objName = new ObjectName(nameBuilder.toString)
- debug("Resolved name for %s, %s, %s to: %s" format (group, name, t, objName))
- objName
- }
-
- /*
- * JMX only has ObjectName.quote, which is pretty nasty looking. This
- * function escapes without quoting, using the rules outlined in:
- * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html
- */
- def makeNameJmxSafe(str: String) = str
- .replace(",", "_")
- .replace("=", "_")
- .replace(":", "_")
- .replace("\"", "_")
- .replace("*", "_")
- .replace("?", "_")
-
def registerBean(bean: MetricMBean) {
if (!server.isRegistered(bean.objectName)) {
debug("Registering MBean for %s." format bean.objectName)
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java b/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java
new file mode 100644
index 0000000..5de2cc7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics;
+
+import java.lang.reflect.Field;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.QueryExp;
+import javax.management.remote.JMXConnector;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestJmxMetricsAccessor {
+ private JmxMetricsAccessor jmxMetricsAccessor;
+ private Set<ObjectName> objectNames;
+ private MBeanServerConnection conn;
+
+ @Before
+ public void setup() throws Exception {
+ jmxMetricsAccessor = new JmxMetricsAccessor("dummyurl");
+ JMXConnector jmxc = mock(JMXConnector.class);
+ conn = mock(MBeanServerConnection.class);
+ when(jmxc.getMBeanServerConnection()).thenReturn(conn);
+ objectNames = new HashSet<>();
+ when(conn.queryNames(any(ObjectName.class), any(QueryExp.class))).thenReturn(objectNames);
+ Field jmxcField = JmxMetricsAccessor.class.getDeclaredField("jmxc");
+ jmxcField.setAccessible(true);
+ jmxcField.set(jmxMetricsAccessor, jmxc);
+ }
+
+ @Test
+ public void testGetCounterValues() throws Exception {
+ ObjectName counterObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "commit-calls", "samza-container-0");
+ objectNames.add(counterObject);
+ Long commitCalls = 100L;
+ when(conn.getAttribute(counterObject, "Count")).thenReturn(commitCalls);
+
+ Map<String, Long> result = jmxMetricsAccessor.getCounterValues(SamzaContainerMetrics.class.getName(),
+ "commit-calls");
+ assertTrue(result.size() == 1);
+ assertTrue(result.get("samza-container-0").equals(commitCalls));
+ }
+
+ @Test
+ public void testGetGaugeValues() throws Exception {
+ ObjectName gaugeObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "event-loop-utilization", "samza-container-1");
+ objectNames.add(gaugeObject);
+ Double loopUtil = 0.8;
+ when(conn.getAttribute(gaugeObject, "Value")).thenReturn(loopUtil);
+
+ Map<String, Double> result = jmxMetricsAccessor.getGaugeValues(SamzaContainerMetrics.class.getName(), "event-loop-utilization");
+ assertTrue(result.size() == 1);
+ assertTrue(result.get("samza-container-1").equals(loopUtil));
+ }
+
+ @Test
+ public void testGetTimerValues() throws Exception {
+ ObjectName timerObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "choose-ns", "samza-container-2");
+ objectNames.add(timerObject);
+ Double time = 42.42;
+ when(conn.getAttribute(timerObject, "AverageTime")).thenReturn(time);
+
+ Map<String, Double> result = jmxMetricsAccessor.getTimerValues(SamzaContainerMetrics.class.getName(), "choose-ns");
+ assertTrue(result.size() == 1);
+ assertTrue(result.get("samza-container-2").equals(time));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-shell/src/main/bash/validate-yarn-job.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/validate-yarn-job.sh b/samza-shell/src/main/bash/validate-yarn-job.sh
new file mode 100644
index 0000000..8273a32
--- /dev/null
+++ b/samza-shell/src/main/bash/validate-yarn-job.sh
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.validation.YarnJobValidationTool "$@"
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
new file mode 100644
index 0000000..70f1e4f
--- /dev/null
+++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.validation;
+
+import java.util.Map;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.yarn.ClientHelper;
+import org.apache.samza.metrics.JmxMetricsAccessor;
+import org.apache.samza.metrics.MetricsValidator;
+import org.apache.samza.util.hadoop.HttpFileSystem;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command-line tool for validating the status of a Yarn job.
+ * It checks the job has been successfully submitted to the Yarn cluster, the status of
+ * the application attempt is running and the running container count matches the expectation.
+ * It also supports an optional MetricsValidator plugin through arguments so job metrics can
+ * be validated too using JMX. This tool can be used, for example, as an automated validation
+ * step after starting a job.
+ *
+ * When running this tool, please provide the configuration URI of job. For example:
+ *
+ * deploy/samza/bin/validate-yarn-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties [--metrics-validator=com.foo.bar.SomeMetricsValidator]
+ *
+ * The tool prints out the validation result in each step and throws an exception when the
+ * validation fails.
+ */
+public class YarnJobValidationTool {
+ private static final Logger log = LoggerFactory.getLogger(YarnJobValidationTool.class);
+
+ private final JobConfig config;
+ private final YarnClient client;
+ private final String jobName;
+ private final MetricsValidator validator;
+
+ public YarnJobValidationTool(JobConfig config, YarnClient client, MetricsValidator validator) {
+ this.config = config;
+ this.client = client;
+ String name = this.config.getName().get();
+ String jobId = this.config.getJobId().nonEmpty()? this.config.getJobId().get() : "1";
+ this.jobName = name + "_" + jobId;
+ this.validator = validator;
+ }
+
+ public void run() {
+ ApplicationId appId;
+ ApplicationAttemptId attemptId;
+
+ try {
+ log.info("Start validating job " + this.jobName);
+
+ appId = validateAppId();
+ attemptId = validateRunningAttemptId(appId);
+ validateContainerCount(attemptId);
+ if(validator != null) {
+ validateJmxMetrics();
+ }
+
+ log.info("End of validation");
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ System.exit(1);
+ }
+ }
+
+ public ApplicationId validateAppId() throws Exception {
+ // fetch only the last created application with the job name and id
+ // i.e. get the application with max appId
+ ApplicationId appId = null;
+ for(ApplicationReport applicationReport : this.client.getApplications()) {
+ if(applicationReport.getName().equals(this.jobName)) {
+ ApplicationId id = applicationReport.getApplicationId();
+ if(appId == null || appId.compareTo(id) < 0) {
+ appId = id;
+ }
+ }
+ }
+ if (appId != null) {
+ log.info("Job lookup success. ApplicationId " + appId.toString());
+ return appId;
+ } else {
+ throw new SamzaException("Job lookup failure " + this.jobName);
+ }
+ }
+
+ public ApplicationAttemptId validateRunningAttemptId(ApplicationId appId) throws Exception {
+ ApplicationAttemptId attemptId = this.client.getApplicationReport(appId).getCurrentApplicationAttemptId();
+ ApplicationAttemptReport attemptReport = this.client.getApplicationAttemptReport(attemptId);
+ if (attemptReport.getYarnApplicationAttemptState() == YarnApplicationAttemptState.RUNNING) {
+ log.info("Job is running. AttempId " + attemptId.toString());
+ return attemptId;
+ } else {
+ throw new SamzaException("Job not running " + this.jobName);
+ }
+ }
+
+ public int validateContainerCount(ApplicationAttemptId attemptId) throws Exception {
+ int runningContainerCount = 0;
+ for(ContainerReport containerReport : this.client.getContainers(attemptId)) {
+ if(containerReport.getContainerState() == ContainerState.RUNNING) {
+ ++runningContainerCount;
+ }
+ }
+ // expected containers to be the configured job containers plus the AppMaster container
+ int containerExpected = this.config.getContainerCount() + 1;
+
+ if (runningContainerCount == containerExpected) {
+ log.info("Container count matches. " + runningContainerCount + " containers are running.");
+ return runningContainerCount;
+ } else {
+ throw new SamzaException("Container count does not match. " + runningContainerCount + " containers are running, while " + containerExpected + " is expected.");
+ }
+ }
+
+ public void validateJmxMetrics() throws Exception {
+ JobCoordinator jobCoordinator = JobCoordinator.apply(config);
+ validator.init(config);
+ Map<Integer, String> jmxUrls = jobCoordinator.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
+ for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) {
+ Integer containerId = entry.getKey();
+ String jmxUrl = entry.getValue();
+ log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl);
+ JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+ jmxMetrics.connect();
+ validator.validate(jmxMetrics);
+ jmxMetrics.close();
+ log.info("validate container " + containerId + " successfully");
+ }
+ validator.complete();
+ }
+
+ public static void main(String [] args) throws Exception {
+ CommandLine cmdline = new CommandLine();
+ OptionParser parser = cmdline.parser();
+ OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.")
+ .withOptionalArg()
+ .ofType(String.class).describedAs("com.foo.bar.ClassName");
+ OptionSet options = cmdline.parser().parse(args);
+ Config config = cmdline.loadConfig(options);
+ MetricsValidator validator = null;
+ if (options.has(validatorOpt)) {
+ String validatorClass = options.valueOf(validatorOpt);
+ validator = (MetricsValidator) Class.forName(validatorClass).newInstance();
+ }
+
+ YarnConfiguration hadoopConfig = new YarnConfiguration();
+ hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName());
+ hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName());
+ ClientHelper clientHelper = new ClientHelper(hadoopConfig);
+
+ new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java b/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java
new file mode 100644
index 0000000..c3cf935
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.validation;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.metrics.MetricsAccessor;
+import org.apache.samza.metrics.MetricsValidationFailureException;
+import org.apache.samza.metrics.MetricsValidator;
+
+
+public class MockMetricsValidator implements MetricsValidator {
+
+ @Override
+ public void init(Config config) {
+ }
+
+ @Override
+ public void validate(MetricsAccessor accessor) throws MetricsValidationFailureException {
+ Map<String, Long> commitCalls = accessor.getCounterValues(SamzaContainerMetrics.class.getName(), "commit-calls");
+ if(commitCalls.isEmpty()) throw new MetricsValidationFailureException("no value");
+ for(Map.Entry<String, Long> entry: commitCalls.entrySet()) {
+ if(entry.getValue() <= 0) {
+ throw new MetricsValidationFailureException("commit call <= 0");
+ }
+ }
+ }
+
+ @Override
+ public void complete() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java b/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
new file mode 100644
index 0000000..7a8d291
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.validation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.metrics.JmxMetricsAccessor;
+import org.apache.samza.metrics.MetricsValidationFailureException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.*;
+
+public class TestYarnJobValidationTool {
+ private YarnClient client;
+ private YarnJobValidationTool tool;
+ private String jobName = "test";
+ private int jobId = 1;
+ private ApplicationId appId;
+ ApplicationAttemptId attemptId;
+ private int containerCount = 9;
+ private Config config = new MapConfig(new HashMap<String, String>() {
+ {
+ put("job.name", jobName);
+ put("job.id", String.valueOf(jobId));
+ put("yarn.container.count", String.valueOf(containerCount));
+ }
+ });
+ private MockMetricsValidator validator = new MockMetricsValidator();
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Before
+ public void setup() throws Exception {
+ client = mock(YarnClient.class);
+ tool = new YarnJobValidationTool(new JobConfig(config), client, validator);
+ appId = mock(ApplicationId.class);
+ when(appId.getId()).thenReturn(1111);
+ attemptId = mock(ApplicationAttemptId.class);
+ when(attemptId.getApplicationId()).thenReturn(appId);
+ when(attemptId.getAttemptId()).thenReturn(2222);
+ }
+
+ @Test
+ public void testValidateAppId() throws Exception {
+ ApplicationReport appReport = mock(ApplicationReport.class);
+ when(appReport.getName()).thenReturn(jobName + "_" + jobId);
+ when(appReport.getApplicationId()).thenReturn(appId);
+ when(client.getApplications()).thenReturn(Collections.singletonList(appReport));
+ assertTrue(tool.validateAppId().equals(appId));
+
+ when(appReport.getName()).thenReturn("dummy");
+ exception.expect(SamzaException.class);
+ tool.validateAppId();
+ }
+
+ @Test
+ public void testValidateRunningAttemptId() throws Exception {
+ ApplicationReport appReport = mock(ApplicationReport.class);
+ when(client.getApplicationReport(appId)).thenReturn(appReport);
+ when(appReport.getCurrentApplicationAttemptId()).thenReturn(attemptId);
+ ApplicationAttemptReport attemptReport = mock(ApplicationAttemptReport.class);
+ when(attemptReport.getYarnApplicationAttemptState()).thenReturn(YarnApplicationAttemptState.RUNNING);
+ when(attemptReport.getApplicationAttemptId()).thenReturn(attemptId);
+ when(client.getApplicationAttemptReport(attemptId)).thenReturn(attemptReport);
+ assertTrue(tool.validateRunningAttemptId(appId).equals(attemptId));
+
+ when(attemptReport.getYarnApplicationAttemptState()).thenReturn(YarnApplicationAttemptState.FAILED);
+ exception.expect(SamzaException.class);
+ tool.validateRunningAttemptId(appId);
+ }
+
+ @Test
+ public void testValidateContainerCount() throws Exception {
+ List<ContainerReport> containerReports = new ArrayList<>();
+ for (int i = 0; i <= containerCount; i++) {
+ ContainerReport report = mock(ContainerReport.class);
+ when(report.getContainerState()).thenReturn(ContainerState.RUNNING);
+ containerReports.add(report);
+ }
+ when(client.getContainers(attemptId)).thenReturn(containerReports);
+ assertTrue(tool.validateContainerCount(attemptId) == (containerCount + 1));
+
+ containerReports.remove(0);
+ exception.expect(SamzaException.class);
+ tool.validateContainerCount(attemptId);
+ }
+
+ @Test
+ public void testValidateJmxMetrics() throws MetricsValidationFailureException {
+ JmxMetricsAccessor jmxMetricsAccessor = mock(JmxMetricsAccessor.class);
+ Map<String, Long> values = new HashMap<>();
+ values.put("samza-container-0", 100L);
+ when(jmxMetricsAccessor.getCounterValues(SamzaContainerMetrics.class.getName(), "commit-calls")).thenReturn(values);
+ validator.validate(jmxMetricsAccessor);
+
+ values.put("samza-container-0", -1L);
+ // the mock validator will fail if the commit-calls are less than or equal to 0
+ exception.expect(MetricsValidationFailureException.class);
+ validator.validate(jmxMetricsAccessor);
+ }
+}