You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/12/16 03:04:05 UTC
tez git commit: TEZ-2976. Recovery fails when InputDescriptor is
changed during input initialization (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 42ef63665 -> 685fa742f
TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/685fa742
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/685fa742
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/685fa742
Branch: refs/heads/master
Commit: 685fa742f7042876d8a87544b1e08ce566837e65
Parents: 42ef636
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Dec 16 10:02:13 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Wed Dec 16 10:03:40 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +
.../apache/tez/examples/HashJoinExample.java | 6 ++-
.../org/apache/tez/examples/JoinValidate.java | 6 ++-
.../apache/tez/examples/OrderedWordCount.java | 7 ++--
.../tez/examples/SimpleSessionExample.java | 2 +-
.../tez/examples/SortMergeJoinExample.java | 6 ++-
.../org/apache/tez/examples/TezExampleBase.java | 19 +++++++--
.../java/org/apache/tez/examples/WordCount.java | 3 +-
.../RecoveryServiceWithEventHandlingHook.java | 4 ++
.../java/org/apache/tez/test/TestLocalMode.java | 2 +-
.../java/org/apache/tez/test/TestRecovery.java | 44 ++++++++++++++------
12 files changed, 75 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0236084..a9d1893 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES
TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job.
ALL CHANGES:
+ TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization.
TEZ-2997. Tez UI: Support searches by CallerContext ID for DAGs
TEZ-2996. TestAnalyzer fails in trunk after recovery redesign
TEZ-2987. TestVertexImpl.testTez2684 fails
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f3cfb58..4e82560 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2231,6 +2231,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) {
eventHandler.handle(new VertexEventRouteEvent(getVertexId(), initGeneratedEvents));
}
+ // reset rootInputDescriptor because it may be changed during input initialization.
+ this.rootInputDescriptors = recoveryData.getVertexInitedEvent().getAdditionalInputs();
} else {
initedTime = clock.getTime();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
index 680de35..935ccbc 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java
@@ -157,7 +157,8 @@ public class HashJoinExample extends TezExampleBase {
MRInput
.createConfigBuilder(new Configuration(tezConf),
TextInputFormat.class, hashPath.toUri().toString())
- .groupSplits(!isDisableSplitGrouping()).build());
+ .groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build());
/**
* This vertex represents that side of the data that will be streamed and
@@ -173,7 +174,8 @@ public class HashJoinExample extends TezExampleBase {
MRInput
.createConfigBuilder(new Configuration(tezConf),
TextInputFormat.class, streamPath.toUri().toString())
- .groupSplits(!isDisableSplitGrouping()).build());
+ .groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build());
/**
* This vertex represents the join operation. It writes the join output as
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index 4883351..186bacd 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -155,14 +155,16 @@ public class JoinValidate extends TezExampleBase {
ForwardingProcessor.class.getName())).addDataSource("lhs",
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
- lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+ lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build());
setVertexExecutionContext(lhsVertex, getLhsExecutionContext());
Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource("rhs",
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
- rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+ rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build());
setVertexExecutionContext(rhsVertex, getRhsExecutionContext());
Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index fff7c1b..6596809 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -123,10 +123,11 @@ public class OrderedWordCount extends TezExampleBase {
}
public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath,
- int numPartitions, boolean disableSplitGrouping, String dagName) throws IOException {
+ int numPartitions, boolean disableSplitGrouping, boolean isGenerateSplitInClient, String dagName) throws IOException {
DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
- TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build();
+ TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping)
+ .generateSplitsInAM(!isGenerateSplitInClient).build();
DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, outputPath).build();
@@ -198,7 +199,7 @@ public class OrderedWordCount extends TezExampleBase {
TezClient tezClient) throws Exception {
DAG dag = createDAG(tezConf, args[0], args[1],
args.length == 3 ? Integer.parseInt(args[2]) : 1, isDisableSplitGrouping(),
- "OrderedWordCount");
+ isGenerateSplitInClient(), "OrderedWordCount");
LOG.info("Running OrderedWordCount");
return runDag(dag, isCountersLog(), LOG);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
index 08a4b12..d555f47 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java
@@ -87,7 +87,7 @@ public class SimpleSessionExample extends TezExampleBase {
for (int i = 0; i < inputPaths.length; ++i) {
DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions,
- isDisableSplitGrouping(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+ isDisableSplitGrouping(), isGenerateSplitInClient(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
LOG.info("Running dag number " + i);
if(runDag(dag, isCountersLog(), LOG) != 0) {
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
index 5a0cbd8..1054e00 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java
@@ -158,7 +158,8 @@ public class SortMergeJoinExample extends TezExampleBase {
MRInput
.createConfigBuilder(new Configuration(tezConf),
TextInputFormat.class, inputPath1.toUri().toString())
- .groupSplits(!isDisableSplitGrouping()).build());
+ .groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build());
/**
* The other vertex represents the other side of the join. It reads text
@@ -173,7 +174,8 @@ public class SortMergeJoinExample extends TezExampleBase {
MRInput
.createConfigBuilder(new Configuration(tezConf),
TextInputFormat.class, inputPath2.toUri().toString())
- .groupSplits(!isDisableSplitGrouping()).build());
+ .groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build());
/**
* This vertex represents the join operation. It writes the join output as
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
index 281eaa9..6960559 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java
@@ -58,10 +58,12 @@ public abstract class TezExampleBase extends Configured implements Tool {
protected static final String DISABLE_SPLIT_GROUPING = "disableSplitGrouping";
protected static final String LOCAL_MODE = "local";
protected static final String COUNTER_LOG = "counter";
+ protected static final String GENERATE_SPLIT_IN_CLIENT = "generateSplitInClient";
private boolean disableSplitGrouping = false;
private boolean isLocalMode = false;
private boolean isCountersLog = false;
+ private boolean generateSplitInClient = false;
protected boolean isCountersLog() {
return isCountersLog;
@@ -71,11 +73,16 @@ public abstract class TezExampleBase extends Configured implements Tool {
return disableSplitGrouping;
}
+ protected boolean isGenerateSplitInClient() {
+ return generateSplitInClient;
+ }
+
private Options getExtraOptions() {
Options options = new Options();
options.addOption(LOCAL_MODE, false, "run it as local mode");
options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping");
options.addOption(COUNTER_LOG, false , "print counter log");
+ options.addOption(GENERATE_SPLIT_IN_CLIENT, false, "whether generate split in client");
return options;
}
@@ -91,9 +98,11 @@ public abstract class TezExampleBase extends Configured implements Tool {
disableSplitGrouping = true;
}
if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
- isCountersLog = true;
+ isCountersLog = true;
+ }
+ if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
+ generateSplitInClient = true;
}
-
return _execute(otherArgs, null, null);
}
@@ -124,7 +133,10 @@ public abstract class TezExampleBase extends Configured implements Tool {
disableSplitGrouping = true;
}
if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) {
- isCountersLog = true;
+ isCountersLog = true;
+ }
+ if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) {
+ generateSplitInClient = true;
}
String[] otherArgs = optionParser.getRemainingArgs();
return _execute(otherArgs, conf, tezClient);
@@ -238,6 +250,7 @@ public abstract class TezExampleBase extends Configured implements Tool {
ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput,"
+ " enable split grouping without this option.");
ps.println("-" + COUNTER_LOG + "\t\t to print counters information");
+ ps.println("-" + GENERATE_SPLIT_IN_CLIENT + "\t\tgenerate input split in client");
ps.println();
ps.println("The Tez example extra options usage syntax is ");
ps.println("example_name [extra_options] [example_parameters]");
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
index 3578267..6149193 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java
@@ -143,7 +143,8 @@ public class WordCount extends TezExampleBase {
// Create the descriptor that describes the input data to Tez. Using MRInput to read text
// data from the given input path. The TextInputFormat is used to read the text data.
DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
- TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping()).build();
+ TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping())
+ .generateSplitsInAM(!isGenerateSplitInClient()).build();
// Create a descriptor that describes the output data to Tez. Using MROoutput to write text
// data to the given output path. The TextOutputFormat is used to write the text data.
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
index cec8fbd..8a0f39e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -222,6 +222,10 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
public SimpleShutdownCondition() {
}
+ public HistoryEvent getHistoryEvent() {
+ return this.event;
+ }
+
private String encodeHistoryEvent(HistoryEvent event) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
event.toProtoStream(out);
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
index 3a03739..2a5b65f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java
@@ -230,7 +230,7 @@ public class TestLocalMode {
try {
for (int i=0; i<inputPaths.length; ++i) {
DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], 1,
- false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
+ false, false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session
tezClient.waitTillReady();
System.out.println("Running dag number " + i);
http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index 4f5ef1a..dc26167 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -52,6 +52,7 @@ import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
@@ -70,6 +71,7 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.OrderedWordCount;
+import org.apache.tez.examples.TezExampleBase;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook;
@@ -239,13 +241,15 @@ public class TestRecovery {
// randomly choose half of the test scenario to avoid
// timeout.
if (rand.nextDouble() < 0.5) {
- testOrderedWordCount(shutdownConditions.get(i), true);
+ // generate split in client side when HistoryEvent type is VERTEX_STARTED (TEZ-2976)
+ testOrderedWordCount(shutdownConditions.get(i), true,
+ shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
}
}
}
private void testOrderedWordCount(SimpleShutdownCondition shutdownCondition,
- boolean enableAutoParallelism) throws Exception {
+ boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception {
LOG.info("shutdownCondition:" + shutdownCondition.getEventType()
+ ", event=" + shutdownCondition.getEvent());
String inputDirStr = "/tmp/owc-input/";
@@ -276,11 +280,16 @@ public class TestRecovery {
tezConf.setBoolean(
TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG");
-
OrderedWordCount job = new OrderedWordCount();
- Assert
- .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[] {
- inputDirStr, outputDirStr, "5" }, null) == 0);
+ if (generateSplitInClient) {
+ Assert
+ .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+ "-generateSplitInClient", inputDirStr, outputDirStr, "5"}, null) == 0);
+ } else {
+ Assert
+ .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{
+ inputDirStr, outputDirStr, "5"}, null) == 0);
+ }
TestTezJobs.verifyOutput(outputDir, remoteFs);
List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser
.readRecoveryEvents(tezConf, job.getAppId(), 1);
@@ -392,13 +401,15 @@ public class TestRecovery {
// randomly choose half of the test scenario to avoid
// timeout.
if (rand.nextDouble() < 0.5) {
- testHashJoinExample(shutdownConditions.get(i), true);
+ // generate split in client side when HistoryEvent type is VERTEX_STARTED (TEZ-2976)
+ testHashJoinExample(shutdownConditions.get(i), true,
+ shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
}
}
}
private void testHashJoinExample(SimpleShutdownCondition shutdownCondition,
- boolean enableAutoParallelism) throws Exception {
+ boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception {
HashJoinExample hashJoinExample = new HashJoinExample();
TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
@@ -449,10 +460,19 @@ public class TestRecovery {
out1.close();
out2.close();
- String[] args = new String[] {
- "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
- + stagingDirPath.toString(), inPath1.toString(),
- inPath2.toString(), "1", outPath.toString() };
+ String[] args = null;
+ if (generateSplitInClient) {
+ args = new String[]{
+ "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
+ + stagingDirPath.toString(),
+ "-generateSplitInClient",
+ inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
+ } else {
+ args = new String[]{
+ "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
+ + stagingDirPath.toString(),
+ inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
+ }
assertEquals(0, hashJoinExample.run(args));
FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {