You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/09 19:32:56 UTC
[3/3] git commit: TEZ-515. Send MapReduce split information over RPC
(to AM and to tasks from the AM). (sseth)
TEZ-515. Send MapReduce split information over RPC (to AM and to tasks
from the AM). (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d41471fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d41471fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d41471fc
Branch: refs/heads/master
Commit: d41471fc6d8910a15c86826cf2bf561b2d2b2f3c
Parents: 74d33b6
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 9 10:32:10 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 9 10:32:10 2013 -0700
----------------------------------------------------------------------
.../mapreduce/examples/FilterLinesByWord.java | 6 +-
.../tez/mapreduce/examples/MRRSleepJob.java | 76 ++++--
.../mapreduce/examples/OrderedWordCount.java | 5 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 5 +-
tez-mapreduce/pom.xml | 31 +++
.../mapreduce/split/SplitMetaInfoReaderTez.java | 1 +
.../common/MRInputSplitDistributor.java | 75 ++++++
.../tez/mapreduce/hadoop/InputSplitInfo.java | 70 +++--
.../mapreduce/hadoop/InputSplitInfoDisk.java | 102 +++++++
.../tez/mapreduce/hadoop/InputSplitInfoMem.java | 79 ++++++
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 268 +++++++++++++++++--
.../tez/mapreduce/hadoop/MRJobConfig.java | 7 +
.../hadoop/MultiStageMRConfToTezTranslator.java | 3 +
.../org/apache/tez/mapreduce/input/MRInput.java | 223 ++++++++++-----
.../src/main/proto/MRRuntimeProtos.proto | 40 +++
.../processor/map/TestMapProcessor.java | 7 +-
.../processor/reduce/TestReduceProcessor.java | 7 +-
.../org/apache/tez/mapreduce/YARNRunner.java | 3 +-
18 files changed, 850 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 996e830..be4534b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -64,8 +64,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
@@ -171,8 +171,8 @@ public class FilterLinesByWord {
// Configure the Input for stage1
stage1Vertex.addInput("MRInput",
- new InputDescriptor(MRInputLegacy.class.getName()).setUserPayload(MRHelpers
- .createUserPayloadFromConf(stage1Conf)));
+ new InputDescriptor(MRInputLegacy.class.getName())
+ .setUserPayload(MRHelpers.createMRInputPayload(stage1Conf, null)), null);
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index c76ec6f..a1fb14a 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -77,8 +77,9 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
@@ -420,7 +421,7 @@ public class MRRSleepJob extends Configured implements Tool {
int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
- long iReduceSleepTime, int iReduceSleepCount)
+ long iReduceSleepTime, int iReduceSleepCount, boolean writeSplitsToDFS)
throws IOException, YarnException {
@@ -516,15 +517,26 @@ public class MRRSleepJob extends Configured implements Tool {
MRHelpers.doJobClientMagic(finalReduceConf);
}
- InputSplitInfo inputSplitInfo;
- try {
- inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
- remoteStagingDir);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- throw new TezUncheckedException("Could not generate input splits", e);
- } catch (ClassNotFoundException e) {
- throw new TezUncheckedException("Failed to generate input splits", e);
+ InputSplitInfo inputSplitInfo = null;
+ if (writeSplitsToDFS) {
+ LOG.info("Writing splits to DFS");
+ try {
+ inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
+ remoteStagingDir);
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException("Could not generate input splits", e);
+ } catch (ClassNotFoundException e) {
+ throw new TezUncheckedException("Failed to generate input splits", e);
+ }
+ } else {
+ try {
+ LOG.info("Creating in-mem splits");
+ inputSplitInfo = MRHelpers.generateInputSplitsToMem(mapStageConf);
+ } catch (ClassNotFoundException e) {
+ throw new TezUncheckedException("Could not generate input splits", e);
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException("Could not generate input splits", e);
+ }
}
DAG dag = new DAG("MRRSleepJob");
@@ -550,25 +562,40 @@ public class MRRSleepJob extends Configured implements Tool {
List<Vertex> vertices = new ArrayList<Vertex>();
+
+ byte[] mapInputPayload = null;
+ if (writeSplitsToDFS) {
+ mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
+ } else {
+ mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, inputSplitInfo.getSplitsProto());
+ }
+
byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(mapUserPayload),
numMapper, MRHelpers.getMapResource(mapStageConf));
mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
- Map<String, LocalResource> mapLocalResources =
- new HashMap<String, LocalResource>();
- mapLocalResources.putAll(commonLocalResources);
- MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
- mapLocalResources);
- mapVertex.setTaskLocalResources(mapLocalResources);
+
+ if (writeSplitsToDFS) {
+ Map<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
+ mapLocalResources.putAll(commonLocalResources);
+ MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
+ mapLocalResources);
+ mapVertex.setTaskLocalResources(mapLocalResources);
+ } else {
+ mapVertex.setTaskLocalResources(commonLocalResources);
+ }
+
Map<String, String> mapEnv = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
mapVertex.setTaskEnvironment(mapEnv);
- MRHelpers.addMRInput(mapVertex, mapUserPayload);
+ if (writeSplitsToDFS) {
+ MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
+ } else {
+ MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputSplitDistributor.class);
+ }
vertices.add(mapVertex);
-
-
if (iReduceStagesCount > 0
&& numIReducer > 0) {
@@ -692,7 +719,8 @@ public class MRRSleepJob extends Configured implements Tool {
" [-irs numIntermediateReducerStages]" +
" [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
" [-irt intermediateReduceSleepTime]" +
- " [-recordt recordSleepTime (msec)]");
+ " [-recordt recordSleepTime (msec)]" +
+ " [-writeSplitsToDfs (false)/true]");
ToolRunner.printGenericCommandUsage(System.err);
return 2;
}
@@ -702,6 +730,7 @@ public class MRRSleepJob extends Configured implements Tool {
iReduceSleepTime=1;
int mapSleepCount = 1, reduceSleepCount = 1, iReduceSleepCount = 1;
int iReduceStagesCount = 1;
+ boolean writeSplitsToDfs = false;
for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) {
@@ -728,6 +757,9 @@ public class MRRSleepJob extends Configured implements Tool {
else if (args[i].equals("-recordt")) {
recSleepTime = Long.parseLong(args[++i]);
}
+ else if (args[i].equals("-writeSplitsToDfs")) {
+ writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
+ }
}
if (numIReducer > 0 && numReducer <= 0) {
@@ -762,7 +794,7 @@ public class MRRSleepJob extends Configured implements Tool {
DAG dag = createDAG(remoteFs, conf, appId, remoteStagingDir,
numMapper, numReducer, iReduceStagesCount, numIReducer,
mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
- iReduceSleepTime, iReduceSleepCount);
+ iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs);
conf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
MRHelpers.getMRAMJavaOpts(conf));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 44cf3da..4093b85 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,9 +70,9 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
@@ -201,6 +201,7 @@ public class OrderedWordCount {
List<Vertex> vertices = new ArrayList<Vertex>();
byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+ byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(mapPayload),
inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(mapStageConf));
@@ -215,7 +216,7 @@ public class OrderedWordCount {
Map<String, String> mapEnv = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
mapVertex.setTaskEnvironment(mapEnv);
- MRHelpers.addMRInput(mapVertex, mapPayload);
+ MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
vertices.add(mapVertex);
Vertex ivertex = new Vertex("ivertex1", new ProcessorDescriptor(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index d9d56bf..2b001b4 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -76,9 +76,9 @@ import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
@@ -399,13 +399,14 @@ public class TestMRRJobsDAGApi {
remoteStagingDir);
byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Conf, null);
byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
DAG dag = new DAG("testMRRSleepJobDagSubmit");
Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(stage1Payload),
inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1));
- MRHelpers.addMRInput(stage1Vertex, stage1Payload);
+ MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, null);
Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(
MRHelpers.createUserPayloadFromConf(stage2Conf)),
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/pom.xml b/tez-mapreduce/pom.xml
index f9755e0..5298195 100644
--- a/tez-mapreduce/pom.xml
+++ b/tez-mapreduce/pom.xml
@@ -91,6 +91,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -99,6 +103,33 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <protocCommand>${protoc.path}</protocCommand>
+ <imports>
+ <param>${basedir}/src/main/proto</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/proto</directory>
+ <includes>
+ <include>MRRuntimeProtos.proto</include>
+ </includes>
+ </source>
+ <output>${project.build.directory}/generated-sources/java</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 0705796..6946001 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -48,6 +48,7 @@ public class SplitMetaInfoReaderTez {
public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
+
// Forked from the MR variant so that the metaInfo file as well as the split
// file can be read from local fs - relying on these files being localized.
public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
new file mode 100644
index 0000000..c35a450
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+
+import com.google.common.collect.Lists;
+
+public class MRInputSplitDistributor implements TezRootInputInitializer {
+
+ private static final Log LOG = LogFactory
+ .getLog(MRInputSplitDistributor.class);
+
+ public MRInputSplitDistributor() {
+ }
+
+ private MRSplitsProto splitProto;
+
+ @Override
+ public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
+ throws IOException {
+
+ MRUserPayloadProto userPayloadProto = MRHelpers.parseMRPayload(rootInputContext.getUserPayload());
+
+ this.splitProto = userPayloadProto.getSplits();
+
+ MRUserPayloadProto.Builder updatedPayloadBuilder = MRUserPayloadProto.newBuilder(userPayloadProto);
+ updatedPayloadBuilder.clearSplits();
+
+ List<Event> events = Lists.newArrayListWithCapacity(this.splitProto.getSplitsCount() + 1);
+ RootInputUpdatePayloadEvent updatePayloadEvent = new RootInputUpdatePayloadEvent(
+ updatedPayloadBuilder.build().toByteArray());
+
+ events.add(updatePayloadEvent);
+ int count = 0;
+ for (MRSplitProto mrSplit : this.splitProto.getSplitsList()) {
+ // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
+ // raw array.
+ RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
+ count++, mrSplit.toByteArray());
+ events.add(diEvent);
+ }
+
+ return events;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
index 3eef766..34ec56f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -20,66 +20,56 @@ package org.apache.tez.mapreduce.hadoop;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+// TODO Fix this to be more usable. Interface is broken since half the methods apply to only a specific type.
/**
- * Information obtained by using the InputFormat class to generate
- * the required InputSplit information files that in turn can be used to
- * setup a DAG.
- *
- * The splitsFile and splitsMetaInfoFile need to be provided as LocalResources
- * to the vertex in question. The numTasks represents the parallelism for
- * the vertex and the taskLocationHints define the possible nodes on which the
- * tasks should be run based on the location of the splits that will be
- * processed by each task.
+ * Provides information about input splits.</p>
+ *
+ * The get*Path methods are only applicable when generating splits to disk. The
+ * getSplitsProto method is only applicable when generating splits to memory.
+ *
*/
-public class InputSplitInfo {
+@Private
+@Unstable
+public interface InputSplitInfo {
- /// Splits file
- private final Path splitsFile;
- /// Meta info file for all the splits information
- private final Path splitsMetaInfoFile;
- /// Location hints to determine where to run the tasks
- private final List<TaskLocationHint> taskLocationHints;
- /// The num of tasks - same as number of splits generated.
- private final int numTasks;
-
- public InputSplitInfo(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
- List<TaskLocationHint> taskLocationHints) {
- this.splitsFile = splitsFile;
- this.splitsMetaInfoFile = splitsMetaInfoFile;
- this.taskLocationHints = taskLocationHints;
- this.numTasks = numTasks;
+ public enum Type {
+ DISK, MEM
}
-
/**
* Get the TaskLocationHints for each task
*/
- public List<TaskLocationHint> getTaskLocationHints() {
- return taskLocationHints;
- }
+ public abstract List<TaskLocationHint> getTaskLocationHints();
/**
* Get the path to the splits meta info file
*/
- public Path getSplitsMetaInfoFile() {
- return splitsMetaInfoFile;
- }
+ public abstract Path getSplitsMetaInfoFile();
/**
* Get the path to the splits file
*/
- public Path getSplitsFile() {
- return splitsFile;
- }
+ public abstract Path getSplitsFile();
/**
+ * Get the splits proto
+ */
+ public abstract MRSplitsProto getSplitsProto();
+
+ /**
* Get the number of splits that were generated. Same as number of tasks that
* should be run for the vertex processing these set of splits.
*/
- public int getNumTasks() {
- return numTasks;
- }
-
-}
+ public abstract int getNumTasks();
+
+ /**
+ * Get the {@link Type} of the InputSplitInfo
+ */
+ public abstract Type getType();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
new file mode 100644
index 0000000..6bb3448
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+/**
+ * Information obtained by using the InputFormat class to generate
+ * the required InputSplit information files that in turn can be used to
+ * setup a DAG.
+ *
+ * The splitsFile and splitsMetaInfoFile need to be provided as LocalResources
+ * to the vertex in question. The numTasks represents the parallelism for
+ * the vertex and the taskLocationHints define the possible nodes on which the
+ * tasks should be run based on the location of the splits that will be
+ * processed by each task.
+ *
+ * The getSplitsProto method is not supported by this implementation.
+ */
+public class InputSplitInfoDisk implements InputSplitInfo {
+
+ /// Splits file
+ private Path splitsFile;
+ /// Meta info file for all the splits information
+ private Path splitsMetaInfoFile;
+ /// Location hints to determine where to run the tasks
+ private List<TaskLocationHint> taskLocationHints;
+ /// The num of tasks - same as number of splits generated.
+ private int numTasks;
+
+ public InputSplitInfoDisk(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
+ List<TaskLocationHint> taskLocationHints) {
+ this.splitsFile = splitsFile;
+ this.splitsMetaInfoFile = splitsMetaInfoFile;
+ this.taskLocationHints = taskLocationHints;
+ this.numTasks = numTasks;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getTaskLocationHints()
+ */
+ @Override
+ public List<TaskLocationHint> getTaskLocationHints() {
+ return taskLocationHints;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getSplitsMetaInfoFile()
+ */
+ @Override
+ public Path getSplitsMetaInfoFile() {
+ return splitsMetaInfoFile;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getSplitsFile()
+ */
+ @Override
+ public Path getSplitsFile() {
+ return splitsFile;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getNumTasks()
+ */
+ @Override
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.DISK;
+ }
+
+ @Override
+ public MRSplitsProto getSplitsProto() {
+ throw new UnsupportedOperationException("Not supported for Type: "
+ + getType());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
new file mode 100644
index 0000000..e6be735
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+/**
+ * Represents InputSplitInfo for splits generated to memory. </p>
+ *
+ * Since splits are generated in memory, the getSplitsMetaInfoFile and
+ * getSplitsFile are not supported.
+ *
+ */
+public class InputSplitInfoMem implements InputSplitInfo {
+
+ private final MRSplitsProto splitsProto;
+ private final List<TaskLocationHint> taskLocationHints;
+ private final int numTasks;
+
+ public InputSplitInfoMem(MRSplitsProto splitsProto,
+ List<TaskLocationHint> taskLocationHints, int numTasks) {
+ this.splitsProto = splitsProto;
+ this.taskLocationHints = taskLocationHints;
+ this.numTasks = numTasks;
+ }
+
+ @Override
+ public List<TaskLocationHint> getTaskLocationHints() {
+ return this.taskLocationHints;
+ }
+
+ @Override
+ public Path getSplitsMetaInfoFile() {
+ throw new UnsupportedOperationException("Not supported for Type: "
+ + getType());
+ }
+
+ @Override
+ public Path getSplitsFile() {
+ throw new UnsupportedOperationException("Not supported for Type: "
+ + getType());
+ }
+
+ @Override
+ public int getNumTasks() {
+ return this.numTasks;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.MEM;
+ }
+
+ @Override
+ public MRSplitsProto getSplitsProto() {
+ return this.splitsProto;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 7c79a76..5271069 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -18,6 +18,7 @@
package org.apache.tez.mapreduce.hadoop;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
@@ -37,12 +38,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.ContainerLogAppender;
@@ -66,12 +72,21 @@ import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
public class MRHelpers {
private static final Log LOG = LogFactory.getLog(MRHelpers.class);
+ static final int SPLIT_SERIALIZED_LENGTH_ESTIMATE = 40;
static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
"job.splitmetainfo";
@@ -126,6 +141,24 @@ public class MRHelpers {
}
}
+ private static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(
+ JobContext jobContext) throws ClassNotFoundException, IOException,
+ InterruptedException {
+ Configuration conf = jobContext.getConfiguration();
+ InputFormat<?, ?> input = ReflectionUtils.newInstance(
+ jobContext.getInputFormatClass(), conf);
+
+ List<org.apache.hadoop.mapreduce.InputSplit> array = input
+ .getSplits(jobContext);
+ org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array
+ .toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
+
+ // sort the splits into order based on size, so that the biggest
+ // go first
+ Arrays.sort(splits, new InputSplitComparator());
+ return splits;
+ }
+
/**
* Generate new-api mapreduce InputFormat splits
* @param jobContext JobContext required by InputFormat
@@ -139,23 +172,13 @@ public class MRHelpers {
* @throws InterruptedException
* @throws ClassNotFoundException
*/
- private static InputSplitInfo writeNewSplits(JobContext jobContext,
+ private static InputSplitInfoDisk writeNewSplits(JobContext jobContext,
Path inputSplitDir) throws IOException, InterruptedException,
ClassNotFoundException {
+
+ org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(jobContext);
+
Configuration conf = jobContext.getConfiguration();
- InputFormat<?, ?> input =
- ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
-
- List<org.apache.hadoop.mapreduce.InputSplit> array =
- input.getSplits(jobContext);
- org.apache.hadoop.mapreduce.InputSplit[] splits =
- (org.apache.hadoop.mapreduce.InputSplit[])
- array.toArray(
- new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
-
- // sort the splits into order based on size, so that the biggest
- // go first
- Arrays.sort(splits, new InputSplitComparator());
JobSplitWriter.createSplitFiles(inputSplitDir, conf,
inputSplitDir.getFileSystem(conf), splits);
@@ -168,12 +191,22 @@ public class MRHelpers {
Arrays.asList(splits[i].getLocations())), null));
}
- return new InputSplitInfo(
+ return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints);
}
+ private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits(
+ JobConf jobConf) throws IOException {
+ org.apache.hadoop.mapred.InputSplit[] splits = jobConf.getInputFormat()
+ .getSplits(jobConf, jobConf.getNumMapTasks());
+ // sort the splits into order based on size, so that the biggest
+ // go first
+ Arrays.sort(splits, new OldInputSplitComparator());
+ return splits;
+ }
+
/**
* Generate old-api mapred InputFormat splits
* @param jobConf JobConf required by InputFormat class
@@ -185,13 +218,11 @@ public class MRHelpers {
*
* @throws IOException
*/
- private static InputSplitInfo writeOldSplits(JobConf jobConf,
+ private static InputSplitInfoDisk writeOldSplits(JobConf jobConf,
Path inputSplitDir) throws IOException {
- org.apache.hadoop.mapred.InputSplit[] splits =
- jobConf.getInputFormat().getSplits(jobConf, jobConf.getNumMapTasks());
- // sort the splits into order based on size, so that the biggest
- // go first
- Arrays.sort(splits, new OldInputSplitComparator());
+
+ org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf);
+
JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
inputSplitDir.getFileSystem(jobConf), splits);
@@ -203,7 +234,7 @@ public class MRHelpers {
Arrays.asList(splits[i].getLocations())), null));
}
- return new InputSplitInfo(
+ return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
splits.length, locationHints);
@@ -235,11 +266,12 @@ public class MRHelpers {
* @throws InterruptedException
* @throws ClassNotFoundException
*/
- public static InputSplitInfo generateInputSplits(Configuration conf,
+ public static InputSplitInfoDisk generateInputSplits(Configuration conf,
Path inputSplitsDir) throws IOException, InterruptedException,
ClassNotFoundException {
Job job = Job.getInstance(conf);
JobConf jobConf = new JobConf(conf);
+ conf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
if (jobConf.getUseNewMapper()) {
LOG.info("Generating new input splits"
+ ", splitsDir=" + inputSplitsDir.toString());
@@ -251,6 +283,112 @@ public class MRHelpers {
}
}
+ /**
+ * Generates Input splits and stores them in a {@link MRProtos} instance.
+ *
+ * Returns an instance of {@link InputSplitInfoMem}
+ *
+ * @param conf
+ * an instance of Configuration which is used to determine whether
+ * the mapred of mapreduce API is being used. This Configuration
+ * instance should also contain adequate information to be able to
+ * generate splits - like the InputFormat being used and related
+ * configuration.
+ * @return an instance of {@link InputSplitInfoMem} which supports a subset of
+ * the APIs defined on {@link InputSplitInfo}
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ InputSplitInfoMem splitInfoMem = null;
+ JobConf jobConf = new JobConf(conf);
+ if (jobConf.getUseNewMapper()) {
+ LOG.info("Generating mapreduce api input splits");
+ Job job = Job.getInstance(conf);
+ org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(job);
+ splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()));
+ } else {
+ LOG.info("Generating mapred api input splits");
+ org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf);
+ splitInfoMem = createSplitsProto(splits);
+ }
+ LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+ + splitInfoMem.getSplitsProto().getSerializedSize());
+ return splitInfoMem;
+ }
+
+ private static InputSplitInfoMem createSplitsProto(
+ org.apache.hadoop.mapreduce.InputSplit[] newSplits,
+ SerializationFactory serializationFactory) throws IOException,
+ InterruptedException {
+ MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+
+ List<TaskLocationHint> locationHints = Lists
+ .newArrayListWithCapacity(newSplits.length);
+ for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
+ splitsBuilder.addSplits(createSplitProto(newSplit, serializationFactory));
+ locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
+ .asList(newSplit.getLocations())), null));
+ }
+ return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
+ newSplits.length);
+ }
+
+ private static <T extends org.apache.hadoop.mapreduce.InputSplit> MRSplitProto createSplitProto(
+ T newSplit, SerializationFactory serializationFactory)
+ throws IOException, InterruptedException {
+ MRSplitProto.Builder builder = MRSplitProto
+ .newBuilder();
+
+ builder.setSplitClassName(newSplit.getClass().getName());
+
+ @SuppressWarnings("unchecked")
+ Serializer<T> serializer = serializationFactory
+ .getSerializer((Class<T>) newSplit.getClass());
+ ByteString.Output out = ByteString
+ .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
+ serializer.open(out);
+ serializer.serialize(newSplit);
+ // TODO MR Compat: Check against max block locations per split.
+ ByteString splitBs = out.toByteString();
+ builder.setSplitBytes(splitBs);
+
+ return builder.build();
+ }
+
+ private static InputSplitInfoMem createSplitsProto(
+ org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
+ MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+
+ List<TaskLocationHint> locationHints = Lists
+ .newArrayListWithCapacity(oldSplits.length);
+ for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
+ splitsBuilder.addSplits(createSplitProto(oldSplit));
+ locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
+ .asList(oldSplit.getLocations())), null));
+ }
+ return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
+ oldSplits.length);
+ }
+
+ private static MRSplitProto createSplitProto(
+ org.apache.hadoop.mapred.InputSplit oldSplit) throws IOException {
+ MRSplitProto.Builder builder = MRSplitProto.newBuilder();
+
+ builder.setSplitClassName(oldSplit.getClass().getName());
+
+ ByteString.Output os = ByteString
+ .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
+ oldSplit.write(new DataOutputStream(os));
+ ByteString splitBs = os.toByteString();
+ builder.setSplitBytes(splitBs);
+
+ return builder.build();
+ }
+
private static String getChildLogLevel(Configuration conf, boolean isMap) {
if (isMap) {
return conf.get(
@@ -460,6 +598,12 @@ public class MRHelpers {
throws IOException {
return TezUtils.createUserPayloadFromConf(conf);
}
+
+ @LimitedPrivate("Hive, Pig")
+ public static ByteString createByteStringFromConf(Configuration conf)
+ throws IOException {
+ return TezUtils.createByteStringFromConf(conf);
+ }
@LimitedPrivate("Hive, Pig")
@Unstable
@@ -468,6 +612,29 @@ public class MRHelpers {
return TezUtils.createConfFromUserPayload(bb);
}
+ @LimitedPrivate("Hive, Pig")
+ public static Configuration createConfFromByteString(ByteString bs)
+ throws IOException {
+ return TezUtils.createConfFromByteString(bs);
+ }
+
+ public static byte[] createMRInputPayload(Configuration conf,
+ MRSplitsProto mrSplitProto) throws IOException {
+ MRUserPayloadProto.Builder userPayloadBuilder = MRUserPayloadProto
+ .newBuilder();
+ userPayloadBuilder.setConfigurationBytes(createByteStringFromConf(conf));
+ if (mrSplitProto != null) {
+ userPayloadBuilder.setSplits(mrSplitProto);
+ }
+ // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be
+ // more efficient.
+ return userPayloadBuilder.build().toByteArray();
+ }
+
+ public static MRUserPayloadProto parseMRPayload(byte[] bytes) throws IOException {
+ return MRUserPayloadProto.parseFrom(bytes);
+ }
+
/**
* Update provided localResources collection with the required local
* resources needed by MapReduce tasks with respect to Input splits.
@@ -666,7 +833,7 @@ public class MRHelpers {
return mrAppMasterAdminOptions.trim()
+ " " + mrAppMasterUserOptions.trim();
}
-
+
/**
* Convenience method to add an MR Input to the specified vertex. The name of
* the Input is "MRInput" </p>
@@ -675,11 +842,13 @@ public class MRHelpers {
*
* @param vertex
* @param userPayload
+ * @param initClazz class to init the input in the AM
*/
- public static void addMRInput(Vertex vertex, byte[] userPayload) {
+ public static void addMRInput(Vertex vertex, byte[] userPayload,
+ Class<? extends TezRootInputInitializer> initClazz) {
InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
.setUserPayload(userPayload);
- vertex.addInput("MRInput", id);
+ vertex.addInput("MRInput", id, initClazz);
}
/**
@@ -696,5 +865,52 @@ public class MRHelpers {
.setUserPayload(userPayload);
vertex.addOutput("MROutput", od);
}
+
+ @SuppressWarnings("unchecked")
+ public static InputSplit createOldFormatSplitFromUserPayload(
+ MRSplitProto splitProto, SerializationFactory serializationFactory)
+ throws IOException {
+ // This may not need to use serialization factory, since OldFormat
+ // always uses Writable to write splits.
+ Preconditions.checkNotNull(splitProto, "splitProto cannot be null");
+ String className = splitProto.getSplitClassName();
+ Class<InputSplit> clazz;
+
+ try {
+ clazz = (Class<InputSplit>) Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
+ }
+
+ Deserializer<InputSplit> deserializer = serializationFactory
+ .getDeserializer(clazz);
+ deserializer.open(splitProto.getSplitBytes().newInput());
+ InputSplit inputSplit = deserializer.deserialize(null);
+ deserializer.close();
+ return inputSplit;
+ }
+ @SuppressWarnings("unchecked")
+ public static org.apache.hadoop.mapreduce.InputSplit createNewFormatSplitFromUserPayload(
+ MRSplitProto splitProto, SerializationFactory serializationFactory)
+ throws IOException {
+ Preconditions.checkNotNull(splitProto, "splitProto must be specified");
+ String className = splitProto.getSplitClassName();
+ Class<org.apache.hadoop.mapreduce.InputSplit> clazz;
+
+ try {
+ clazz = (Class<org.apache.hadoop.mapreduce.InputSplit>) Class
+ .forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
+ }
+
+ Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = serializationFactory
+ .getDeserializer(clazz);
+ deserializer.open(splitProto.getSplitBytes().newInput());
+ org.apache.hadoop.mapreduce.InputSplit inputSplit = deserializer
+ .deserialize(null);
+ deserializer.close();
+ return inputSplit;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index a42597e..6951261 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -19,11 +19,15 @@ package org.apache.tez.mapreduce.hadoop;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface MRJobConfig {
+ @Private
+ static final String MR_TEZ_PREFIX = "mapreduce.tez.";
+
// Put all of the attribute names in here so that Job and JobContext are
// consistent.
public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";
@@ -650,6 +654,9 @@ public interface MRJobConfig {
"mrr.vertex.";
public static final String VERTEX_NAME = "mapreduce.task.vertex.name";
+
+ public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
+ public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
// Stage specific properties
// Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index b07b04b..9f1a2c5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -220,6 +220,9 @@ public class MultiStageMRConfToTezTranslator {
// Assuming no 0 map jobs, and the first stage is always a map.
int numStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
+ // Read split info from HDFS
+ conf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+
// Setup Tez partitioner class
conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
MRPartitioner.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index ef0de5c..d8f3709 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -19,6 +19,9 @@ package org.apache.tez.mapreduce.input;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,16 +45,20 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.library.api.KeyValueReader;
import com.google.common.base.Preconditions;
@@ -68,9 +75,12 @@ public class MRInput implements LogicalInput {
private static final Log LOG = LogFactory.getLog(MRInput.class);
-
+ private final Lock rrLock = new ReentrantLock();
+ Condition rrInited = rrLock.newCondition();
private TezInputContext inputContext;
+ private boolean eventReceived = false;
+
private JobConf jobConf;
private Configuration incrementalConf;
private boolean recordReaderCreated = false;
@@ -82,31 +92,31 @@ public class MRInput implements LogicalInput {
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
@SuppressWarnings("rawtypes")
- private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
- protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+ private volatile org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+ protected volatile org.apache.hadoop.mapreduce.InputSplit newInputSplit;
@SuppressWarnings("rawtypes")
private InputFormat oldInputFormat;
@SuppressWarnings("rawtypes")
- protected RecordReader oldRecordReader;
+ protected volatile RecordReader oldRecordReader;
+ private volatile InputSplit oldInputSplit;
protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
private TezCounter inputRecordCounter;
private TezCounter fileInputByteCounter;
private List<Statistics> fsStats;
-
+
@Override
public List<Event> initialize(TezInputContext inputContext) throws IOException {
this.inputContext = inputContext;
- Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+ MRUserPayloadProto mrUserPayload = MRHelpers.parseMRPayload(inputContext.getUserPayload());
+ Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
+ "All split information not expected in MRInput");
+ Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
- // Read split information.
- TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
- TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
- this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
- thisTaskMetaInfo.getStartOffset());
+
// TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
// theory, can be used by the MapProcessor, ReduceProcessor or a custom
@@ -115,69 +125,146 @@ public class MRInput implements LogicalInput {
this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
useNewApi = this.jobConf.getUseNewMapper();
-
- if (useNewApi) {
- TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
- Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
- try {
- inputFormatClazz = taskAttemptContext.getInputFormatClass();
- } catch (ClassNotFoundException e) {
- throw new IOException("Unable to instantiate InputFormat class", e);
+ boolean viaEvents = conf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+ MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
+ LOG.info("Using New mapreduce API: " + useNewApi
+ + ", split information from events: " + viaEvents);
+
+ if (viaEvents) {
+ if (useNewApi) {
+ setupNewInputFormat();
+ } else {
+ setupOldInputFormat();
}
- newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+ } else {
+ // Read split information.
+ TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+ TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
+ .getTaskIndex()];
+ this.splitMetaInfo = new TaskSplitIndex(
+ thisTaskMetaInfo.getSplitLocation(),
+ thisTaskMetaInfo.getStartOffset());
+ if (useNewApi) {
+ setupNewInputFormat();
+ newInputSplit = getNewSplitDetailsFromDisk(splitMetaInfo);
+ setupNewRecordReader();
+ } else {
+ setupOldInputFormat();
+ oldInputSplit = getOldSplitDetailsFromDisk(splitMetaInfo);
+ setupOldRecordReader();
+ }
- newInputSplit = getNewSplitDetails(splitMetaInfo);
+ }
- List<Statistics> matchedStats = null;
- if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
- matchedStats = Utils.getFsStatistics(
- ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
- newInputSplit).getPath(), this.jobConf);
- }
- fsStats = matchedStats;
-
- try {
- newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
- newRecordReader.initialize(newInputSplit, taskAttemptContext);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted while creating record reader", e);
- }
- } else { // OLD API
- oldInputFormat = this.jobConf.getInputFormat();
- InputSplit oldInputSplit =
- getOldSplitDetails(splitMetaInfo);
-
-
- List<Statistics> matchedStats = null;
- if (oldInputSplit instanceof FileSplit) {
- matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
- }
- fsStats = matchedStats;
-
- long bytesInPrev = getInputBytes();
- oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
- this.jobConf, new MRReporter(inputContext, oldInputSplit));
- long bytesInCurr = getInputBytes();
- fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
- setIncrementalConfigParams(oldInputSplit);
- }
+ LOG.info("Initialzed MRInput: " + inputContext.getSourceVertexName());
return null;
}
+
+
+ private void setupOldInputFormat() {
+ oldInputFormat = this.jobConf.getInputFormat();
+ }
+
+ private void setupOldRecordReader() throws IOException {
+ Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
+ List<Statistics> matchedStats = null;
+ if (oldInputSplit instanceof FileSplit) {
+ matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
+ }
+ fsStats = matchedStats;
+
+ long bytesInPrev = getInputBytes();
+ oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+ this.jobConf, new MRReporter(inputContext, oldInputSplit));
+ long bytesInCurr = getInputBytes();
+ fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+ setIncrementalConfigParams(oldInputSplit);
+ }
+
+ private void setupNewInputFormat() throws IOException {
+ taskAttemptContext = createTaskAttemptContext();
+ Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+ try {
+ inputFormatClazz = taskAttemptContext.getInputFormatClass();
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Unable to instantiate InputFormat class", e);
+ }
+
+ newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+ }
+
+ private void setupNewRecordReader() throws IOException {
+ Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");
+ List<Statistics> matchedStats = null;
+ if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+ matchedStats = Utils.getFsStatistics(
+ ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+ newInputSplit).getPath(), this.jobConf);
+ }
+ fsStats = matchedStats;
+
+ try {
+ newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+ newRecordReader.initialize(newInputSplit, taskAttemptContext);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while creating record reader", e);
+ }
+ }
+
@Override
public KeyValueReader getReader() throws IOException {
Preconditions
.checkState(recordReaderCreated == false,
"Only a single instance of record reader can be created for this input.");
recordReaderCreated = true;
+ if (newRecordReader == null && oldRecordReader == null) {
+ rrLock.lock();
+ try {
+ LOG.info("Awaiting RecordReader initialization");
+ try {
+ rrInited.await();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted awaiting RecordReader setup", e);
+ }
+ } finally {
+ rrLock.unlock();
+ }
+ }
+ LOG.info("Creating reader for MRInput: " + inputContext.getSourceVertexName());
return new MRInputKVReader();
}
-
@Override
- public void handleEvents(List<Event> inputEvents) {
- // Not expecting any events at the moment.
+ public void handleEvents(List<Event> inputEvents) throws Exception {
+ if (eventReceived || inputEvents.size() != 1) {
+ throw new IllegalStateException(
+ "MRInput expects only a single input. Received: current eventListSize: "
+ + inputEvents.size() + "Received previous input: "
+ + eventReceived);
+ }
+ Event event = inputEvents.iterator().next();
+ MRSplitProto splitProto = MRSplitProto
+ .parseFrom(((RootInputDataInformationEvent) event).getUserPayload());
+ if (useNewApi) {
+ newInputSplit = getNewSplitDetailsFromEvent(splitProto);
+ LOG.info("Split Details -> SplitClass: "
+ + newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
+ setupNewRecordReader();
+ } else {
+ oldInputSplit = getOldSplitDetailsFromEvent(splitProto);
+ LOG.info("Split Details -> SplitClass: "
+ + oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
+ setupOldRecordReader();
+ }
+ rrLock.lock();
+ try {
+ LOG.info("Notifying on inited to unblock reader");
+ rrInited.signal();
+ } finally {
+ rrLock.unlock();
+ }
}
@@ -227,8 +314,16 @@ public class MRInput implements LogicalInput {
return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
}
+ private InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto)
+ throws IOException {
+ SerializationFactory serializationFactory = new SerializationFactory(
+ jobConf);
+ return MRHelpers.createOldFormatSplitFromUserPayload(splitProto,
+ serializationFactory);
+ }
+
@SuppressWarnings("unchecked")
- private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+ private InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo)
throws IOException {
Path file = new Path(splitMetaInfo.getSplitLocation());
FileSystem fs = FileSystem.getLocal(jobConf);
@@ -263,8 +358,16 @@ public class MRInput implements LogicalInput {
return split;
}
+ private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
+ MRSplitProto splitProto) throws IOException {
+ SerializationFactory serializationFactory = new SerializationFactory(
+ jobConf);
+ return MRHelpers.createNewFormatSplitFromUserPayload(
+ splitProto, serializationFactory);
+ }
+
@SuppressWarnings("unchecked")
- private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+ private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
TaskSplitIndex splitMetaInfo) throws IOException {
Path file = new Path(splitMetaInfo.getSplitLocation());
long offset = splitMetaInfo.getStartOffset();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
new file mode 100644
index 0000000..ca9f5b9
--- /dev/null
+++ b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.mapreduce.protos";
+option java_outer_classname = "MRRuntimeProtos";
+option java_generate_equals_and_hash = true;
+
+// Represents multiple MR splits
+message MRSplitsProto {
+ optional int32 version = 1;
+ repeated MRSplitProto splits = 2;
+ // May be useful to have a single large field to serialize all splits
+ // and use CodedInputStream to read these efficiently (via a Writable/Hadoop serialization wrapper).
+}
+
+// Represents a single MRSplit
+message MRSplitProto {
+ optional string split_class_name = 1;
+ optional bytes split_bytes = 2;
+}
+
+message MRUserPayloadProto {
+ optional bytes configuration_bytes = 1;
+ optional MRSplitsProto splits = 2;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 5b8eedf..2593318 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -35,6 +35,7 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.mapreduce.TestUmbilical;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -111,6 +112,7 @@ public class TestMapProcessor {
vertexName);
JobConf job = new JobConf(stageConf);
+ job.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
@@ -120,7 +122,10 @@ public class TestMapProcessor {
MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
- InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+ new InputDescriptor(MRInputLegacy.class.getName())
+ .setUserPayload(MRHelpers.createMRInputPayload(job, null)),
+ 0);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 3910b02..7672983 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -125,11 +126,15 @@ public class TestReduceProcessor {
mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
"localized-resources").toUri().toString());
+ mapConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
Path mapInput = new Path(workDir, "map0");
MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
- InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
+ InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+ new InputDescriptor(MRInputLegacy.class.getName())
+ .setUserPayload(MRHelpers.createMRInputPayload(mapConf, null)),
+ 0);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
// Run a map
LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index ab06e52..63e62ae 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -406,7 +406,8 @@ public class YARNRunner implements ClientProtocol {
setUserPayload(vertexUserPayload),
numTasks, taskResource);
if (isMap) {
- MRHelpers.addMRInput(vertex, vertexUserPayload);
+ byte[] mapInputPayload = MRHelpers.createMRInputPayload(stageConf, null);
+ MRHelpers.addMRInput(vertex, mapInputPayload, null);
}
// Map only jobs.
if (stageNum == totalStages -1) {