You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/12 20:58:01 UTC
git commit: TEZ-672. Fix Tez specific examples to work on secure
clusters. (sseth)
Updated Branches:
refs/heads/master 008912262 -> 44a7b72bb
TEZ-672. Fix Tez specific examples to work on secure clusters. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/44a7b72b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/44a7b72b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/44a7b72b
Branch: refs/heads/master
Commit: 44a7b72bbb133dbf3f5253f786e6edc3630c4cfd
Parents: 0089122
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Dec 12 11:57:35 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Dec 12 11:57:35 2013 -0800
----------------------------------------------------------------------
.../org/apache/tez/client/TezClientUtils.java | 9 +++++++--
.../tez/mapreduce/examples/FilterLinesByWord.java | 9 ++++++++-
.../tez/mapreduce/examples/MRRSleepJob.java | 12 +++++++++++-
.../tez/mapreduce/examples/OrderedWordCount.java | 17 ++++++++++++++---
.../mapreduce/common/MRInputAMSplitGenerator.java | 4 ++--
.../tez/mapreduce/hadoop/InputSplitInfo.java | 7 +++++++
.../tez/mapreduce/hadoop/InputSplitInfoDisk.java | 18 +++++++++++++-----
.../tez/mapreduce/hadoop/InputSplitInfoMem.java | 9 ++++++++-
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 17 +++++++++--------
9 files changed, 79 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index d72ecc4..d156866 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -225,6 +225,12 @@ public class TezClientUtils {
FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
amConfig.getStagingDir());
+ Path binaryConfPath = new Path(amConfig.getStagingDir(),
+ TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+ binaryConfPath = fs.makeQualified(binaryConfPath);
+
+ // TODO TEZ-674 - Obtain tokens for the staging dir. Ideally TokenCache should be used,
+ // but that's in a separate module. Similarly Master is in a separate module.
// Setup resource requirements
Resource capability = Records.newRecord(Resource.class);
@@ -306,8 +312,7 @@ public class TezClientUtils {
// emit conf as PB file
Configuration finalTezConf = createFinalTezConfForApp(conf,
amConfig.getAMConf());
- Path binaryConfPath = new Path(amConfig.getStagingDir(),
- TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+
FSDataOutputStream amConfPBOutBinaryStream = null;
try {
ConfigurationProto.Builder confProtoBuilder =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 38cd4e1..8c37683 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -91,6 +93,7 @@ public class FilterLinesByWord {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
Configuration conf = new Configuration();
String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+ Credentials credentials = new Credentials();
boolean generateSplitsInClient = false;
@@ -137,6 +140,7 @@ public class FilterLinesByWord {
Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+ TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, conf);
Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
LocalResource dagJarLocalRsrc = LocalResource.newInstance(
@@ -147,7 +151,7 @@ public class FilterLinesByWord {
- AMConfiguration amConf = new AMConfiguration(null, commonLocalResources, tezConf, null);
+ AMConfiguration amConf = new AMConfiguration(null, commonLocalResources, tezConf, credentials);
TezSessionConfiguration sessionConf = new TezSessionConfiguration(amConf, tezConf);
TezSession tezSession = new TezSession("FilterLinesByWordSession", sessionConf);
tezSession.start(); // Why do I need to start the TezSession.
@@ -162,6 +166,9 @@ public class FilterLinesByWord {
InputSplitInfo inputSplitInfo = null;
if (generateSplitsInClient) {
inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+ if (inputSplitInfo.getCredentials() != null) {
+ credentials.addAll(inputSplitInfo.getCredentials());
+ }
}
MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index ade06f1..4353fb1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -51,6 +51,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -416,6 +418,8 @@ public class MRRSleepJob extends Configured implements Tool {
int res = ToolRunner.run(new Configuration(), new MRRSleepJob(), args);
System.exit(res);
}
+
+ private Credentials credentials = new Credentials();
public DAG createDAG(FileSystem remoteFs, Configuration conf,
ApplicationId appId, Path remoteStagingDir,
@@ -541,6 +545,9 @@ public class MRRSleepJob extends Configured implements Tool {
throw new TezUncheckedException("Could not generate input splits", e);
}
}
+ if (inputSplitInfo.getCredentials() != null) {
+ this.credentials.addAll(inputSplitInfo.getCredentials());
+ }
}
DAG dag = new DAG("MRRSleepJob");
@@ -553,6 +560,9 @@ public class MRRSleepJob extends Configured implements Tool {
new Path(remoteStagingDir, "dag_job.jar"));
remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
+
+ TokenCache.obtainTokensForNamenodes(this.credentials, new Path[] { remoteJarPath },
+ mapStageConf);
Map<String, LocalResource> commonLocalResources =
new HashMap<String, LocalResource>();
@@ -827,7 +837,7 @@ public class MRRSleepJob extends Configured implements Tool {
MRHelpers.getMRAMJavaOpts(conf));
AMConfiguration amConfig = new AMConfiguration(null,
- null, conf, null);
+ null, conf, this.credentials);
DAGClient dagClient =
tezClient.submitDAGApplication(appId, dag, amConfig);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index c9f81f8..f6fe7dd 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -44,7 +44,9 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -145,7 +147,9 @@ public class OrderedWordCount {
}
}
- private static DAG createDAG(FileSystem fs, Configuration conf,
+ private Credentials credentials = new Credentials();
+
+ private DAG createDAG(FileSystem fs, Configuration conf,
Map<String, LocalResource> commonLocalResources, Path stagingDir,
int dagIndex, String inputPath, String outputPath,
boolean generateSplitsInClient) throws Exception {
@@ -329,6 +333,7 @@ public class OrderedWordCount {
TezConfiguration tezConf = new TezConfiguration(conf);
TezClient tezClient = new TezClient(tezConf);
ApplicationId appId = tezClient.createApplication();
+ OrderedWordCount instance = new OrderedWordCount();
FileSystem fs = FileSystem.get(conf);
@@ -338,6 +343,8 @@ public class OrderedWordCount {
Path stagingDir = new Path(stagingDirStr);
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
stagingDir = fs.makeQualified(stagingDir);
+
+ TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
@@ -346,9 +353,12 @@ public class OrderedWordCount {
// No need to add jar containing this class as assumed to be part of
// the tez jars.
+ // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
+ // is the same filesystem as the one used for Input/Output.
+
TezSession tezSession = null;
AMConfiguration amConfig = new AMConfiguration(null,
- null, tezConf, null);
+ null, tezConf, instance.credentials);
if (useTezSession) {
LOG.info("Creating Tez Session");
TezSessionConfiguration sessionConfig =
@@ -393,7 +403,8 @@ public class OrderedWordCount {
Map<String, LocalResource> localResources =
new TreeMap<String, LocalResource>();
- DAG dag = createDAG(fs, conf, localResources,
+
+ DAG dag = instance.createDAG(fs, conf, localResources,
stagingDir, dagIndex, inputPath, outputPath,
generateSplitsInClient);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index da9ce55..31847b1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -116,7 +116,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
}
}
inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
- locationHints, splits.length);
+ locationHints, splits.length, null);
} else {
LOG.info("Grouping mapred api input splits");
org.apache.hadoop.mapred.InputSplit[] splits = MRHelpers
@@ -142,7 +142,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
}
}
inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
- locationHints, splits.length);
+ locationHints, splits.length, null);
}
} else {
inputSplitInfo = MRHelpers.generateInputSplitsToMem(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
index 0cf52bc..355c30f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -72,4 +73,10 @@ public interface InputSplitInfo {
* Get the {@link Type} of the InputSplitInfo
*/
public abstract Type getType();
+
+ /**
+ * Get {@link Credentials} which may be required to access the splits.
+ * @return
+ */
+ public abstract Credentials getCredentials();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
index 6bb3448..981c4d9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.hadoop;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -40,20 +41,22 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
public class InputSplitInfoDisk implements InputSplitInfo {
/// Splits file
- private Path splitsFile;
+ private final Path splitsFile;
/// Meta info file for all the splits information
- private Path splitsMetaInfoFile;
+ private final Path splitsMetaInfoFile;
/// Location hints to determine where to run the tasks
- private List<TaskLocationHint> taskLocationHints;
+ private final List<TaskLocationHint> taskLocationHints;
/// The num of tasks - same as number of splits generated.
- private int numTasks;
+ private final int numTasks;
+ private final Credentials credentials;
public InputSplitInfoDisk(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
- List<TaskLocationHint> taskLocationHints) {
+ List<TaskLocationHint> taskLocationHints, Credentials credentials) {
this.splitsFile = splitsFile;
this.splitsMetaInfoFile = splitsMetaInfoFile;
this.taskLocationHints = taskLocationHints;
this.numTasks = numTasks;
+ this.credentials = credentials;
}
/* (non-Javadoc)
@@ -98,5 +101,10 @@ public class InputSplitInfoDisk implements InputSplitInfo {
throw new UnsupportedOperationException("Not supported for Type: "
+ getType());
}
+
+ @Override
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
index e6be735..b77f668 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.hadoop;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -36,12 +37,14 @@ public class InputSplitInfoMem implements InputSplitInfo {
private final MRSplitsProto splitsProto;
private final List<TaskLocationHint> taskLocationHints;
private final int numTasks;
+ private final Credentials credentials;
public InputSplitInfoMem(MRSplitsProto splitsProto,
- List<TaskLocationHint> taskLocationHints, int numTasks) {
+ List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials) {
this.splitsProto = splitsProto;
this.taskLocationHints = taskLocationHints;
this.numTasks = numTasks;
+ this.credentials = credentials;
}
@Override
@@ -76,4 +79,8 @@ public class InputSplitInfoMem implements InputSplitInfo {
return this.splitsProto;
}
+ @Override
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 90790b7..af1f9e6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -224,7 +225,7 @@ public class MRHelpers {
return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
- splits.length, locationHints);
+ splits.length, locationHints, jobContext.getCredentials());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -293,7 +294,7 @@ public class MRHelpers {
return new InputSplitInfoDisk(
JobSubmissionFiles.getJobSplitFile(inputSplitDir),
JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
- splits.length, locationHints);
+ splits.length, locationHints, jobConf.getCredentials());
}
/**
@@ -366,12 +367,12 @@ public class MRHelpers {
Job job = Job.getInstance(conf);
org.apache.hadoop.mapreduce.InputSplit[] splits =
generateNewSplits(job, null, 0);
- splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()));
+ splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()), job.getCredentials());
} else {
LOG.info("Generating mapred api input splits");
org.apache.hadoop.mapred.InputSplit[] splits =
generateOldSplits(jobConf, null, 0);
- splitInfoMem = createSplitsProto(splits);
+ splitInfoMem = createSplitsProto(splits, jobConf.getCredentials());
}
LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+ splitInfoMem.getSplitsProto().getSerializedSize());
@@ -380,7 +381,7 @@ public class MRHelpers {
private static InputSplitInfoMem createSplitsProto(
org.apache.hadoop.mapreduce.InputSplit[] newSplits,
- SerializationFactory serializationFactory) throws IOException,
+ SerializationFactory serializationFactory, Credentials credentials) throws IOException,
InterruptedException {
MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
@@ -392,7 +393,7 @@ public class MRHelpers {
.asList(newSplit.getLocations())), null));
}
return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
- newSplits.length);
+ newSplits.length, credentials);
}
@Private
@@ -419,7 +420,7 @@ public class MRHelpers {
}
private static InputSplitInfoMem createSplitsProto(
- org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
+ org.apache.hadoop.mapred.InputSplit[] oldSplits, Credentials credentials) throws IOException {
MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
List<TaskLocationHint> locationHints = Lists
@@ -430,7 +431,7 @@ public class MRHelpers {
.asList(oldSplit.getLocations())), null));
}
return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
- oldSplits.length);
+ oldSplits.length, credentials);
}
@Private