You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/01 01:23:55 UTC
git commit: TEZ-784. Add TestDriver to allow cmd line submission of
tests to a cluster (bikas)
Updated Branches:
refs/heads/master f9094d469 -> 00aecc826
TEZ-784. Add TestDriver to allow cmd line submission of tests to a cluster (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/00aecc82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/00aecc82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/00aecc82
Branch: refs/heads/master
Commit: 00aecc826c6a2c5050fee085d579fa7127db099c
Parents: f9094d4
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 31 16:22:55 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 31 16:23:23 2014 -0800
----------------------------------------------------------------------
tez-tests/pom.xml | 11 ++
.../tez/test/FaultToleranceTestRunner.java | 164 +++++++++++++++++
.../java/org/apache/tez/test/SimpleTestDAG.java | 61 +++++++
.../java/org/apache/tez/test/TestDriver.java | 39 ++++
.../org/apache/tez/test/TestFaultTolerance.java | 183 +++++--------------
.../java/org/apache/tez/test/TestInput.java | 38 ++--
.../java/org/apache/tez/test/TestOutput.java | 6 +
.../java/org/apache/tez/test/TestProcessor.java | 30 +--
8 files changed, 363 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
index 7941da1..ea4ec49 100644
--- a/tez-tests/pom.xml
+++ b/tez-tests/pom.xml
@@ -84,6 +84,17 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.tez.test.TestDriver</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java b/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
new file mode 100644
index 0000000..fb12352
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+/**
+ * Run a DAG on a cluster with the given configuration. Starts a TezSession
+ * using default cluster configuration from installation. Then uses reflection
+ * to get the class from the first class-name argument. That class must have a
+ * static method - createDAG(org.apache.hadoop.conf.Configuration) that returns
+ * a DAG. Configuration is picked up by reading the file specified via the
+ * second path argument. The static method is invoked to get the DAG. The DAG is
+ * then executed in the session. Returns success if DAG succeeds.
+ */
+public class FaultToleranceTestRunner {
+
+ static String TEST_ROOT_DIR = "tmp";
+
+ TezSession tezSession = null;
+ Resource defaultResource = Resource.newInstance(100, 0);
+
+
+ void setup() throws Exception {
+ TezConfiguration tezConf = new TezConfiguration(new YarnConfiguration());
+ FileSystem defaultFs = FileSystem.get(tezConf);
+
+ Path remoteStagingDir = defaultFs.makeQualified(new Path(TEST_ROOT_DIR, String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(tezConf, remoteStagingDir);
+
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+
+ AMConfiguration amConfig = new AMConfiguration(
+ new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("FaultToleranceTestRunner", tezSessionConfig);
+ tezSession.start();
+ }
+
+ void tearDown() throws Exception {
+ if (tezSession != null) {
+ tezSession.stop();
+ }
+ }
+
+ DAG getDAG(String className, String confFilePath) throws Exception {
+ Class<?> clazz = Class.forName(className);
+ Method method = clazz.getMethod("createDAG", Configuration.class);
+
+ Configuration testConf = new Configuration(false);
+ if (confFilePath != null) {
+ Path confPath = new Path(confFilePath);
+ testConf.addResource(confPath);
+ }
+
+ DAG dag = (DAG) method.invoke(null, testConf);
+
+ return dag;
+ }
+
+ boolean run(String className, String confFilePath) throws Exception {
+ setup();
+
+ try {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+ System.out.println("Waiting for session to be ready. Current: " + status);
+ Thread.sleep(500);
+ status = tezSession.getSessionStatus();
+ }
+ if (status == TezSessionStatus.SHUTDOWN) {
+ throw new TezUncheckedException("Unexpected Session shutdown");
+ }
+
+ DAG dag = getDAG(className, confFilePath);
+
+ DAGClient dagClient = tezSession.submitDAG(dag);
+ DAGStatus dagStatus = dagClient.getDAGStatus(null);
+ while (!dagStatus.isCompleted()) {
+ System.out.println("Waiting for dag to complete. Sleeping for 500ms."
+ + " DAG name: " + dag.getName()
+ + " DAG appId: " + dagClient.getApplicationId()
+ + " Current state: " + dagStatus.getState());
+ Thread.sleep(500);
+ dagStatus = dagClient.getDAGStatus(null);
+ }
+
+ if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+ return true;
+ }
+
+ } finally {
+ tearDown();
+ }
+
+ return false;
+ }
+
+ static void printUsage() {
+ System.err.println(
+ "Usage: " + " FaultToleranceTestRunner <dag-class-name> <test-conf-path>");
+ }
+
+ public static void main(String[] args) throws Exception {
+ String className = null;
+ String confFilePath = null;
+ if (args.length == 1) {
+ className = args[0];
+ } else if (args.length == 2) {
+ className = args[0];
+ confFilePath = args[1];
+ } else {
+ printUsage();
+ System.exit(1);
+ }
+
+ FaultToleranceTestRunner job = new FaultToleranceTestRunner();
+ if (job.run(className, confFilePath)) {
+ System.out.println("Succeeded.");
+ } else {
+ System.out.println("Failed.");
+ System.exit(2);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
new file mode 100644
index 0000000..85c5d8b
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+
+public class SimpleTestDAG {
+ static Resource defaultResource = Resource.newInstance(100, 0);
+ public static String TEZ_SIMPLE_DAG_NUM_TASKS =
+ "tez.simple-test-dag.num-tasks";
+ public static int TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT = 2;
+
+ public static DAG createDAG(String name,
+ Configuration conf) throws Exception {
+ byte[] payload = null;
+ int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
+ if (conf != null) {
+ taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+ DAG dag = new DAG(name);
+ Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ return dag;
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("SimpleTestDAG", conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java b/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
new file mode 100644
index 0000000..bf04fd5
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+public class TestDriver {
+
+ public static void main(String argv[]){
+ int exitCode = -1;
+ ProgramDriver pgd = new ProgramDriver();
+ try {
+ pgd.addClass("FaultToleranceTestRunner", FaultToleranceTestRunner.class,
+ "Run different DAGs for fault tolerance testing");
+ exitCode = pgd.run(argv);
+ }
+ catch(Throwable e){
+ e.printStackTrace();
+ }
+
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 3a43eca..0ce2bed 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
@@ -40,15 +39,11 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -63,7 +58,6 @@ public class TestFaultTolerance {
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestFaultTolerance.class.getName() + "-tmpDir";
- private static Resource defaultResource = Resource.newInstance(100, 0);
private static TezSession tezSession = null;
@BeforeClass
@@ -135,226 +129,133 @@ public class TestFaultTolerance {
Assert.assertEquals(finalState, dagStatus.getState());
}
- ProcessorDescriptor getProcDesc(byte[] payload) {
- return new ProcessorDescriptor(TestProcessor.class.getName()).
- setUserPayload(payload);
- }
-
- InputDescriptor getInputDesc(byte[] payload) {
- return new InputDescriptor(TestInput.class.getName()).
- setUserPayload(payload);
- }
-
- OutputDescriptor getOutputDesc(byte[] payload) {
- return new OutputDescriptor(TestOutput.class.getName()).
- setUserPayload(payload);
- }
-
@Test (timeout=60000)
public void testBasicSuccessScatterGather() throws Exception {
- DAG dag = new DAG("testBasicSuccessScatterGather");
- Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(null))));
+ DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", null);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testBasicSuccessBroadcast() throws Exception {
DAG dag = new DAG("testBasicSuccessBroadcast");
- Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+ Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
+ Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
new EdgeProperty(DataMovementType.BROADCAST,
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(null))));
+ TestOutput.getOutputDesc(null),
+ TestInput.getInputDesc(null))));
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testBasicTaskFailure() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
testConf.set(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
testConf.setInt(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
-
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
- DAG dag = new DAG("testBasicTaskFailure");
- Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(null))));
+ DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testTaskMultipleFailures() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
testConf.set(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0,1");
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0,1");
testConf.setInt(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
-
- DAG dag = new DAG("testTaskMultipleFailures");
- Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(null))));
+ DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testTaskMultipleFailuresDAGFail() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
testConf.set(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
testConf.setInt(TestProcessor.getVertexConfName(
- TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), -1);
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), -1);
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
-
- DAG dag = new DAG("testTaskMultipleFailuresDAGFail");
- Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(null))));
+ DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailuresDAGFail", testConf);
runDAGAndVerify(dag, DAGStatus.State.FAILED);
}
@Test (timeout=60000)
public void testBasicInputFailureWithExit() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.setBoolean(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
-
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
- DAG dag = new DAG("testBasicInputFailureWithExit");
- Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(payload))));
+ DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testBasicInputFailureWithoutExit() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
-
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
- DAG dag = new DAG("testBasicInputFailureWithoutExit");
- Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(payload))));
+ DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testMultipleInputFailureWithoutExit() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0,1");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
-
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
- DAG dag = new DAG("testMultipleInputFailureWithoutExit");
- Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(payload))));
+ DAG dag = SimpleTestDAG.createDAG("testMultipleInputFailureWithoutExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testMultiVersionInputFailureWithoutExit() throws Exception {
- Configuration testConf = new Configuration();
+ Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
testConf.set(TestInput.getVertexConfName(
TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
testConf.set(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
testConf.setInt(TestInput.getVertexConfName(
- TestInput.TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
- byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
-
- DAG dag = new DAG("testMultiVersionInputFailureWithoutExit");
- Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
- Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
- dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
- new EdgeProperty(DataMovementType.SCATTER_GATHER,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- getOutputDesc(null),
- getInputDesc(payload))));
+ DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 5d0ac71..b5decb5 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
@@ -72,41 +73,46 @@ public class TestInput implements LogicalInput {
/**
* Enable failure for this logical input
*/
- public static String TEZ_AM_FAILING_INPUT_DO_FAIL =
- "tez.am.failing-input.do-fail";
+ public static String TEZ_FAILING_INPUT_DO_FAIL =
+ "tez.failing-input.do-fail";
/**
* Logical input will exit (and cause task failure) after reporting failure to
* read.
*/
- public static String TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT =
- "tez.am.failing-input.do-fail-and-exit";
+ public static String TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT =
+ "tez.failing-input.do-fail-and-exit";
/**
* Which physical inputs to fail. This is a comma separated list of +ve integers.
* -1 means fail all.
*/
- public static String TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX =
- "tez.am.failing-input.failing-input-index";
+ public static String TEZ_FAILING_INPUT_FAILING_INPUT_INDEX =
+ "tez.failing-input.failing-input-index";
/**
* Up to which version of the above physical inputs to fail. 0 will fail the
* first version. 1 will fail the first and second versions. And so on.
*/
- public static String TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT =
- "tez.am.failing-input.failing-upto-input-attempt";
+ public static String TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT =
+ "tez.failing-input.failing-upto-input-attempt";
/**
* Indices of the tasks in the first for which this input will fail. Comma
* separated list of +ve integers. -1 means all tasks. E.g. 0 means the first
* task in the vertex will have failing inputs.
*/
public static String TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX =
- "tez.am.failing-input.failing-task-index";
+ "tez.failing-input.failing-task-index";
/**
* Which task attempts will fail the input. This is a comma separated list of
* +ve integers. -1 means all will fail. E.g. specifying 1 means the first
* attempt will not fail the input but a re-run (the second attempt) will
* trigger input failure. So this can be used to simulate cascading failures.
*/
- public static String TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT =
- "tez.am.failing-input.failing-task-attempt";
+ public static String TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT =
+ "tez.failing-input.failing-task-attempt";
+
+ public static InputDescriptor getInputDesc(byte[] payload) {
+ return new InputDescriptor(TestInput.class.getName()).
+ setUserPayload(payload);
+ }
public int doRead() {
boolean done = true;
@@ -185,9 +191,9 @@ public class TestInput implements LogicalInput {
if (inputContext.getUserPayload() != null) {
String vName = inputContext.getTaskVertexName();
conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());
- doFail = conf.getBoolean(getVertexConfName(TEZ_AM_FAILING_INPUT_DO_FAIL, vName), false);
+ doFail = conf.getBoolean(getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL, vName), false);
doFailAndExit = conf.getBoolean(
- getVertexConfName(TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
+ getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
LOG.info("doFail: " + doFail + " doFailAndExit: " + doFailAndExit);
if (doFail) {
for (String failingIndex :
@@ -198,16 +204,16 @@ public class TestInput implements LogicalInput {
}
for (String failingIndex :
conf.getTrimmedStringCollection(
- getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, vName))) {
+ getVertexConfName(TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, vName))) {
LOG.info("Adding failing task attempt: " + failingIndex);
failingTaskAttempts.add(Integer.valueOf(failingIndex));
}
failingInputUpto = conf.getInt(
- getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, vName), 0);
+ getVertexConfName(TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, vName), 0);
LOG.info("Adding failing input upto: " + failingInputUpto);
for (String failingIndex :
conf.getTrimmedStringCollection(
- getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, vName))) {
+ getVertexConfName(TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, vName))) {
LOG.info("Adding failing input index: " + failingIndex);
failingInputIndices.add(Integer.valueOf(failingIndex));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index bb4f7b5..40e56f1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezOutputContext;
@@ -33,6 +34,11 @@ import com.google.common.collect.Lists;
public class TestOutput implements LogicalOutput {
private static final Log LOG = LogFactory.getLog(TestOutput.class);
+ public static OutputDescriptor getOutputDesc(byte[] payload) {
+ return new OutputDescriptor(TestOutput.class.getName()).
+ setUserPayload(payload);
+ }
+
int numOutputs;
TezOutputContext outputContext;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index d539585..c314042 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalIOProcessor;
@@ -59,25 +60,30 @@ public class TestProcessor implements LogicalIOProcessor {
/**
* Enable failure for this processor
*/
- public static String TEZ_AM_FAILING_PROCESSOR_DO_FAIL =
- "tez.am.failing-processor.do-fail";
+ public static String TEZ_FAILING_PROCESSOR_DO_FAIL =
+ "tez.failing-processor.do-fail";
/**
* Time to sleep in the processor in milliseconds.
*/
- public static String TEZ_AM_FAILING_PROCESSOR_SLEEP_MS =
- "tez.am.failing-processor.sleep-ms";
+ public static String TEZ_FAILING_PROCESSOR_SLEEP_MS =
+ "tez.failing-processor.sleep-ms";
/**
* The indices of tasks in the vertex for which the processor will fail. This
* is a comma-separated list of +ve integeres. -1 means all fail.
*/
- public static String TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX =
- "tez.am.failing-processor.failing-task-index";
+ public static String TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX =
+ "tez.failing-processor.failing-task-index";
/**
* Up to which attempt of the tasks will fail. Specifying 0 means the first
* attempt will fail. 1 means first and second attempt will fail. And so on.
*/
- public static String TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT =
- "tez.am.failing-processor.failing-upto-task-attempt";
+ public static String TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT =
+ "tez.failing-processor.failing-upto-task-attempt";
+
+ public static ProcessorDescriptor getProcDesc(byte[] payload) {
+ return new ProcessorDescriptor(TestProcessor.class.getName()).
+ setUserPayload(payload);
+ }
void throwException(String msg) {
RuntimeException e = new RuntimeException(msg);
@@ -97,19 +103,19 @@ public class TestProcessor implements LogicalIOProcessor {
conf = MRHelpers.createConfFromUserPayload(processorContext
.getUserPayload());
doFail = conf.getBoolean(
- getVertexConfName(TEZ_AM_FAILING_PROCESSOR_DO_FAIL, vName), false);
+ getVertexConfName(TEZ_FAILING_PROCESSOR_DO_FAIL, vName), false);
sleepMs = conf.getLong(
- getVertexConfName(TEZ_AM_FAILING_PROCESSOR_SLEEP_MS, vName), 0);
+ getVertexConfName(TEZ_FAILING_PROCESSOR_SLEEP_MS, vName), 0);
LOG.info("doFail: " + doFail);
if (doFail) {
for (String failingIndex : conf
.getTrimmedStringCollection(
- getVertexConfName(TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, vName))) {
+ getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, vName))) {
LOG.info("Adding failing task index: " + failingIndex);
failingTaskIndices.add(Integer.valueOf(failingIndex));
}
failingTaskAttemptUpto = conf.getInt(
- getVertexConfName(TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
+ getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
LOG.info("Adding failing attempt : " + failingTaskAttemptUpto +
" dag: " + processorContext.getDAGName());
}