You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/23 08:00:33 UTC
git commit: TEZ-880. Support sending deserialized data in
RootInputDataInformationEvents. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 25cb89305 -> 1c112c103
TEZ-880. Support sending deserialized data in
RootInputDataInformationEvents. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1c112c10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1c112c10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1c112c10
Branch: refs/heads/master
Commit: 1c112c1034a3da21bb709c3708d358323c1a80eb
Parents: 25cb893
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Feb 22 23:00:15 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Feb 22 23:00:15 2014 -0800
----------------------------------------------------------------------
.../events/RootInputDataInformationEvent.java | 35 +++++++-
.../common/MRInputAMSplitGenerator.java | 55 +++++++-----
.../common/MRInputSplitDistributor.java | 43 +++++++--
.../tez/mapreduce/hadoop/InputSplitInfoMem.java | 95 ++++++++++++++++++--
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 78 +++++++++-------
.../tez/mapreduce/hadoop/MRJobConfig.java | 4 +
.../org/apache/tez/mapreduce/input/MRInput.java | 21 +++--
7 files changed, 247 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
index 0b0d32f..25fd9de 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
@@ -18,23 +18,47 @@
package org.apache.tez.runtime.api.events;
+import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezRootInputInitializer;
/**
* Events used by {@link TezRootInputInitializer} implementations to provide the
- * byte payload for individual tasks running as part of the Vertex for which an
+ * user payload for individual tasks running as part of the Vertex for which an
* Initial Input has been configured.
+ *
+ * This event is used by InputInitialziers to configure tasks belonging to a
+ * Vertex. The event may be processed by a @link {@link VertexManagerPlugin}
+ * before being sent to tasks.
+ *
+ * A {@link TezRootInputInitializer} may send Events with or without a
+ * serialized user payload.
+ *
+ * Events, after being processed by a {@link VertexManagerPlugin,} must
+ * contained the payload in a serialized form.
*/
public final class RootInputDataInformationEvent extends Event {
private final int sourceIndex;
private int targetIndex; // TODO Likely to be multiple at a later point.
private final byte[] userPayload;
+ private final Object userPayloadObject;
+ /**
+ * Provide a serialzied form of the payload
+ * @param srcIndex the src index
+ * @param userPayload the serizlied payload
+ */
public RootInputDataInformationEvent(int srcIndex, byte[] userPayload) {
this.sourceIndex = srcIndex;
this.userPayload = userPayload;
+ this.userPayloadObject = null;
+ }
+
+ public RootInputDataInformationEvent(int srcIndex, Object userPayloadDeserialized) {
+ this.sourceIndex = srcIndex;
+ this.userPayloadObject = userPayloadDeserialized;
+ this.userPayload = null;
}
public int getSourceIndex() {
@@ -52,10 +76,15 @@ public final class RootInputDataInformationEvent extends Event {
public byte[] getUserPayload() {
return this.userPayload;
}
+
+ public Object getDeserializedUserPayload() {
+ return this.userPayloadObject;
+ }
@Override
public String toString() {
return "RootInputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
- + targetIndex + "]";
- }
+ + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
+ + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index e87feae..414778f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -26,13 +26,13 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -47,6 +47,8 @@ import com.google.common.collect.Lists;
public class MRInputAMSplitGenerator implements TezRootInputInitializer {
+ private boolean sendSerializedEvents;
+
private static final Log LOG = LogFactory
.getLog(MRInputAMSplitGenerator.class);
@@ -72,6 +74,10 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
}
Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
+ sendSerializedEvents = conf.getBoolean(
+ MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD,
+ MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT);
+ LOG.info("Emitting serialized splits: " + sendSerializedEvents);
if (LOG.isDebugEnabled()) {
sw.stop();
LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
@@ -104,16 +110,11 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
Job job = Job.getInstance(conf);
org.apache.hadoop.mapreduce.InputSplit[] splits = MRHelpers
.generateNewSplits(job, realInputFormatName, numTasks);
- SerializationFactory serializationFactory = new SerializationFactory(
- job.getConfiguration());
-
- MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+ // Move all this into a function
List<TaskLocationHint> locationHints = Lists
.newArrayListWithCapacity(splits.length);
for (org.apache.hadoop.mapreduce.InputSplit split : splits) {
- splitsBuilder.addSplits(MRHelpers.createSplitProto(split,
- serializationFactory));
String rack =
((org.apache.hadoop.mapreduce.split.TezGroupedSplit) split).getRack();
if (rack == null) {
@@ -128,17 +129,14 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
Collections.singleton(rack)));
}
}
- inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
- locationHints, splits.length, null);
+ inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null, conf);
} else {
LOG.info("Grouping mapred api input splits");
org.apache.hadoop.mapred.InputSplit[] splits = MRHelpers
.generateOldSplits(jobConf, realInputFormatName, numTasks);
List<TaskLocationHint> locationHints = Lists
.newArrayListWithCapacity(splits.length);
- MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
for (org.apache.hadoop.mapred.InputSplit split : splits) {
- splitsBuilder.addSplits(MRHelpers.createSplitProto(split));
String rack =
((org.apache.hadoop.mapred.split.TezGroupedSplit) split).getRack();
if (rack == null) {
@@ -153,8 +151,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
Collections.singleton(rack)));
}
}
- inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
- locationHints, splits.length, null);
+ inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null, conf);
}
} else {
inputSplitInfo = MRHelpers.generateInputSplitsToMem(conf);
@@ -171,16 +168,30 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
inputSplitInfo.getNumTasks(), inputSplitInfo.getTaskLocationHints());
events.add(configureVertexEvent);
- MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
-
- int count = 0;
- for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
- // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
- // raw array.
- RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
- count++, mrSplit.toByteArray());
- events.add(diEvent);
+ if (sendSerializedEvents) {
+ MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
+ int count = 0;
+ for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
+ // Unnecessary array copy, can be avoided by using ByteBuffer instead of a raw array.
+ RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++,
+ mrSplit.toByteArray());
+ events.add(diEvent);
+ }
+ } else {
+ int count = 0;
+ if (inputSplitInfo.holdsNewFormatSplits()) {
+ for (org.apache.hadoop.mapreduce.InputSplit split : inputSplitInfo.getNewFormatSplits()) {
+ RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+ events.add(diEvent);
+ }
+ } else {
+ for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
+ RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+ events.add(diEvent);
+ }
+ }
}
+
return events;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index b6bdcfc..b1a0a4d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -23,7 +23,11 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -38,8 +42,9 @@ import com.google.common.collect.Lists;
public class MRInputSplitDistributor implements TezRootInputInitializer {
- private static final Log LOG = LogFactory
- .getLog(MRInputSplitDistributor.class);
+ private static final Log LOG = LogFactory.getLog(MRInputSplitDistributor.class);
+
+ private boolean sendSerializedEvents;
public MRInputSplitDistributor() {
}
@@ -59,7 +64,14 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
-
+ Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+ .getConfigurationBytes());
+ JobConf jobConf = new JobConf(conf);
+ boolean useNewApi = jobConf.getUseNewMapper();
+ sendSerializedEvents = conf.getBoolean(
+ MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD,
+ MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT);
+ LOG.info("Emitting serialized splits: " + sendSerializedEvents);
this.splitsProto = userPayloadProto.getSplits();
@@ -72,12 +84,27 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
events.add(updatePayloadEvent);
int count = 0;
+
for (MRSplitProto mrSplit : this.splitsProto.getSplitsList()) {
- // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
- // raw array.
- RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
- count++, mrSplit.toByteArray());
- events.add(diEvent);
+
+ RootInputDataInformationEvent diEvent;
+
+ if (sendSerializedEvents) {
+ // Unnecessary array copy, can be avoided by using ByteBuffer instead of
+ // a raw array.
+ diEvent = new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+ } else {
+ if (useNewApi) {
+ org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInput
+ .getNewSplitDetailsFromEvent(mrSplit, conf);
+ diEvent = new RootInputDataInformationEvent(count++, newInputSplit);
+ } else {
+ org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInput.getOldSplitDetailsFromEvent(
+ mrSplit, conf);
+ diEvent = new RootInputDataInformationEvent(count++, oldInputSplit);
+ }
+ events.add(diEvent);
+ }
}
return events;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
index b77f668..6cdbaaa 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -18,13 +18,18 @@
package org.apache.tez.mapreduce.hadoop;
+import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.security.Credentials;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import com.google.common.base.Preconditions;
+
/**
* Represents InputSplitInfo for splits generated to memory. </p>
*
@@ -34,17 +39,40 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
*/
public class InputSplitInfoMem implements InputSplitInfo {
- private final MRSplitsProto splitsProto;
- private final List<TaskLocationHint> taskLocationHints;
+// private final MRSplitsProto splitsProto;
+ private final boolean isNewSplit;
private final int numTasks;
private final Credentials credentials;
+ private final Configuration conf;
+ private final List<TaskLocationHint> taskLocationHints;
+
+ private org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits;
+ private org.apache.hadoop.mapred.InputSplit[] oldFormatSplits;
- public InputSplitInfoMem(MRSplitsProto splitsProto,
- List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials) {
- this.splitsProto = splitsProto;
+ // TaskLocationHints accepted as a parameter since InputSplits don't have rack
+ // info, and it can't always be derived.
+ public InputSplitInfoMem(org.apache.hadoop.mapreduce.InputSplit[] newSplits,
+ List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials,
+ Configuration conf) {
+ this.isNewSplit = true;
+ this.newFormatSplits = newSplits;
this.taskLocationHints = taskLocationHints;
this.numTasks = numTasks;
this.credentials = credentials;
+ this.conf = conf;
+ }
+
+ // TaskLocationHints accepted as a parameter since InputSplits don't have rack
+ // info, and it can't always be derived.
+ public InputSplitInfoMem(org.apache.hadoop.mapred.InputSplit[] oldSplits,
+ List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials,
+ Configuration conf) {
+ this.isNewSplit = false;
+ this.oldFormatSplits = oldSplits;
+ this.taskLocationHints = taskLocationHints;
+ this.numTasks = numTasks;
+ this.credentials = credentials;
+ this.conf = conf;
}
@Override
@@ -76,11 +104,66 @@ public class InputSplitInfoMem implements InputSplitInfo {
@Override
public MRSplitsProto getSplitsProto() {
- return this.splitsProto;
+ if (isNewSplit) {
+ try {
+ return createSplitsProto(newFormatSplits, new SerializationFactory(conf));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ try {
+ return createSplitsProto(oldFormatSplits);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
public Credentials getCredentials() {
return this.credentials;
}
+
+ public boolean holdsNewFormatSplits() {
+ return this.isNewSplit;
+ }
+
+ public org.apache.hadoop.mapreduce.InputSplit[] getNewFormatSplits() {
+ Preconditions
+ .checkState(
+ isNewSplit == true,
+ "Cannot fetch newSplits for an instance handling oldFormatSplits. Use holdsNewFormatSplits() to check type");
+ return newFormatSplits;
+ }
+
+ public org.apache.hadoop.mapred.InputSplit[] getOldFormatSplits() {
+ Preconditions
+ .checkState(
+ isNewSplit == false,
+ "Cannot fetch newSplits for an instance handling newFormatSplits. Use holdsNewFormatSplits() to check type");
+ return oldFormatSplits;
+ }
+
+ private static MRSplitsProto createSplitsProto(
+ org.apache.hadoop.mapreduce.InputSplit[] newSplits,
+ SerializationFactory serializationFactory) throws IOException,
+ InterruptedException {
+ MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+
+ for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
+ splitsBuilder.addSplits(MRHelpers.createSplitProto(newSplit, serializationFactory));
+ }
+ return splitsBuilder.build();
+ }
+
+ private static MRSplitsProto createSplitsProto(
+ org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
+ MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+ for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
+ splitsBuilder.addSplits(MRHelpers.createSplitProto(oldSplit));
+ }
+ return splitsBuilder.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index a01503c..91fa4ae 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -81,7 +80,9 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.api.TezRootInputInitializer;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
@@ -369,34 +370,21 @@ public class MRHelpers {
Job job = Job.getInstance(conf);
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(job, null, 0);
- splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()), job.getCredentials());
+ splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
+ splits.length, job.getCredentials(), job.getConfiguration());
} else {
LOG.info("Generating mapred api input splits");
org.apache.hadoop.mapred.InputSplit[] splits =
generateOldSplits(jobConf, null, 0);
- splitInfoMem = createSplitsProto(splits, jobConf.getCredentials());
+ splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
+ splits.length, jobConf.getCredentials(), jobConf);
}
LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+ splitInfoMem.getSplitsProto().getSerializedSize());
return splitInfoMem;
}
- private static InputSplitInfoMem createSplitsProto(
- org.apache.hadoop.mapreduce.InputSplit[] newSplits,
- SerializationFactory serializationFactory, Credentials credentials) throws IOException,
- InterruptedException {
- MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
-
- List<TaskLocationHint> locationHints = Lists
- .newArrayListWithCapacity(newSplits.length);
- for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
- splitsBuilder.addSplits(createSplitProto(newSplit, serializationFactory));
- locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
- .asList(newSplit.getLocations())), null));
- }
- return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
- newSplits.length, credentials);
- }
+
@Private
public static <T extends org.apache.hadoop.mapreduce.InputSplit> MRSplitProto createSplitProto(
@@ -421,21 +409,6 @@ public class MRHelpers {
return builder.build();
}
- private static InputSplitInfoMem createSplitsProto(
- org.apache.hadoop.mapred.InputSplit[] oldSplits, Credentials credentials) throws IOException {
- MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
-
- List<TaskLocationHint> locationHints = Lists
- .newArrayListWithCapacity(oldSplits.length);
- for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
- splitsBuilder.addSplits(createSplitProto(oldSplit));
- locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
- .asList(oldSplit.getLocations())), null));
- }
- return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
- oldSplits.length, credentials);
- }
-
@Private
public static MRSplitProto createSplitProto(
org.apache.hadoop.mapred.InputSplit oldSplit) throws IOException {
@@ -1008,4 +981,41 @@ public class MRHelpers {
deserializer.close();
return inputSplit;
}
+
+ private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
+ org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits) {
+ Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(newFormatSplits),
+ new Function<org.apache.hadoop.mapreduce.InputSplit, TaskLocationHint>() {
+ @Override
+ public TaskLocationHint apply(org.apache.hadoop.mapreduce.InputSplit input) {
+ try {
+ return new TaskLocationHint(new HashSet<String>(Arrays.asList(input.getLocations())),
+ null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ return Lists.newArrayList(iterable);
+ }
+
+ private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
+ org.apache.hadoop.mapred.InputSplit[] oldFormatSplits) {
+ Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(oldFormatSplits),
+ new Function<org.apache.hadoop.mapred.InputSplit, TaskLocationHint>() {
+ @Override
+ public TaskLocationHint apply(org.apache.hadoop.mapred.InputSplit input) {
+ try {
+ return new TaskLocationHint(new HashSet<String>(Arrays.asList(input.getLocations())),
+ null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ return Lists.newArrayList(iterable);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 45de43a..eb9b005 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -624,4 +624,8 @@ public interface MRJobConfig {
public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
+ public static final String MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD = MR_TEZ_PREFIX
+ + "input.initializer.serialize.event.payload";
+ public static final boolean MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT = true;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 706f4fe..383b952 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -373,12 +373,12 @@ public class MRInput implements LogicalInput {
MRSplitProto splitProto = MRSplitProto
.parseFrom(initEvent.getUserPayload());
if (useNewApi) {
- newInputSplit = getNewSplitDetailsFromEvent(splitProto);
+ newInputSplit = getNewSplitDetailsFromEvent(splitProto, jobConf);
LOG.info("Split Details -> SplitClass: "
+ newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
setupNewRecordReader();
} else {
- oldInputSplit = getOldSplitDetailsFromEvent(splitProto);
+ oldInputSplit = getOldSplitDetailsFromEvent(splitProto, jobConf);
LOG.info("Split Details -> SplitClass: "
+ oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
setupOldRecordReader();
@@ -386,12 +386,11 @@ public class MRInput implements LogicalInput {
LOG.info("Initialized RecordReader from event");
}
- private InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto)
+ @Private
+ public static InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto, Configuration conf)
throws IOException {
- SerializationFactory serializationFactory = new SerializationFactory(
- jobConf);
- return MRHelpers.createOldFormatSplitFromUserPayload(splitProto,
- serializationFactory);
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
}
@SuppressWarnings("unchecked")
@@ -430,10 +429,10 @@ public class MRInput implements LogicalInput {
return split;
}
- private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
- MRSplitProto splitProto) throws IOException {
- SerializationFactory serializationFactory = new SerializationFactory(
- jobConf);
+ @Private
+ public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
+ MRSplitProto splitProto, Configuration conf) throws IOException {
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
return MRHelpers.createNewFormatSplitFromUserPayload(
splitProto, serializationFactory);
}