You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/12 20:58:01 UTC

git commit: TEZ-672. Fix Tez specific examples to work on secure clusters. (sseth)

Updated Branches:
  refs/heads/master 008912262 -> 44a7b72bb


TEZ-672. Fix Tez specific examples to work on secure clusters. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/44a7b72b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/44a7b72b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/44a7b72b

Branch: refs/heads/master
Commit: 44a7b72bbb133dbf3f5253f786e6edc3630c4cfd
Parents: 0089122
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Dec 12 11:57:35 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Dec 12 11:57:35 2013 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java     |  9 +++++++--
 .../tez/mapreduce/examples/FilterLinesByWord.java |  9 ++++++++-
 .../tez/mapreduce/examples/MRRSleepJob.java       | 12 +++++++++++-
 .../tez/mapreduce/examples/OrderedWordCount.java  | 17 ++++++++++++++---
 .../mapreduce/common/MRInputAMSplitGenerator.java |  4 ++--
 .../tez/mapreduce/hadoop/InputSplitInfo.java      |  7 +++++++
 .../tez/mapreduce/hadoop/InputSplitInfoDisk.java  | 18 +++++++++++++-----
 .../tez/mapreduce/hadoop/InputSplitInfoMem.java   |  9 ++++++++-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java    | 17 +++++++++--------
 9 files changed, 79 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index d72ecc4..d156866 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -225,6 +225,12 @@ public class TezClientUtils {
 
     FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
         amConfig.getStagingDir());
+    Path binaryConfPath =  new Path(amConfig.getStagingDir(),
+        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+    binaryConfPath = fs.makeQualified(binaryConfPath);
+    
+    // TODO TEZ-674 - Obtain tokens for the staging dir. Ideally TokenCache should be used,
+    // but that's in a separate module. Similarly Master is in a separate module.
 
     // Setup resource requirements
     Resource capability = Records.newRecord(Resource.class);
@@ -306,8 +312,7 @@ public class TezClientUtils {
     // emit conf as PB file
     Configuration finalTezConf = createFinalTezConfForApp(conf,
       amConfig.getAMConf());
-    Path binaryConfPath =  new Path(amConfig.getStagingDir(),
-        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+    
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
       ConfigurationProto.Builder confProtoBuilder =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 38cd4e1..8c37683 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -91,6 +93,7 @@ public class FilterLinesByWord {
   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
     Configuration conf = new Configuration();
     String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+    Credentials credentials = new Credentials();
 
     boolean generateSplitsInClient = false;
 
@@ -137,6 +140,7 @@ public class FilterLinesByWord {
     Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
     fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
     FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+    TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, conf);
 
     Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
     LocalResource dagJarLocalRsrc = LocalResource.newInstance(
@@ -147,7 +151,7 @@ public class FilterLinesByWord {
 
 
 
-    AMConfiguration amConf = new AMConfiguration(null, commonLocalResources, tezConf, null);
+    AMConfiguration amConf = new AMConfiguration(null, commonLocalResources, tezConf, credentials);
     TezSessionConfiguration sessionConf = new TezSessionConfiguration(amConf, tezConf);
     TezSession tezSession = new TezSession("FilterLinesByWordSession", sessionConf);
     tezSession.start(); // Why do I need to start the TezSession.
@@ -162,6 +166,9 @@ public class FilterLinesByWord {
     InputSplitInfo inputSplitInfo = null;
     if (generateSplitsInClient) {
       inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+      if (inputSplitInfo.getCredentials() != null) {
+        credentials.addAll(inputSplitInfo.getCredentials());
+      }
     }
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index ade06f1..4353fb1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -51,6 +51,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -416,6 +418,8 @@ public class MRRSleepJob extends Configured implements Tool {
     int res = ToolRunner.run(new Configuration(), new MRRSleepJob(), args);
     System.exit(res);
   }
+  
+  private Credentials credentials = new Credentials();
 
   public DAG createDAG(FileSystem remoteFs, Configuration conf,
       ApplicationId appId, Path remoteStagingDir,
@@ -541,6 +545,9 @@ public class MRRSleepJob extends Configured implements Tool {
           throw new TezUncheckedException("Could not generate input splits", e);
         }
       }
+      if (inputSplitInfo.getCredentials() != null) {
+        this.credentials.addAll(inputSplitInfo.getCredentials());
+      }
     }
 
     DAG dag = new DAG("MRRSleepJob");
@@ -553,6 +560,9 @@ public class MRRSleepJob extends Configured implements Tool {
         new Path(remoteStagingDir, "dag_job.jar"));
     remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
     FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
+    
+    TokenCache.obtainTokensForNamenodes(this.credentials, new Path[] { remoteJarPath },
+        mapStageConf);
 
     Map<String, LocalResource> commonLocalResources =
         new HashMap<String, LocalResource>();
@@ -827,7 +837,7 @@ public class MRRSleepJob extends Configured implements Tool {
         MRHelpers.getMRAMJavaOpts(conf));
 
     AMConfiguration amConfig = new AMConfiguration(null,
-        null, conf, null);
+        null, conf, this.credentials);
 
     DAGClient dagClient =
         tezClient.submitDAGApplication(appId, dag, amConfig);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index c9f81f8..f6fe7dd 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -44,7 +44,9 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -145,7 +147,9 @@ public class OrderedWordCount {
     }
   }
 
-  private static DAG createDAG(FileSystem fs, Configuration conf,
+  private Credentials credentials = new Credentials();
+  
+  private DAG createDAG(FileSystem fs, Configuration conf,
       Map<String, LocalResource> commonLocalResources, Path stagingDir,
       int dagIndex, String inputPath, String outputPath,
       boolean generateSplitsInClient) throws Exception {
@@ -329,6 +333,7 @@ public class OrderedWordCount {
     TezConfiguration tezConf = new TezConfiguration(conf);
     TezClient tezClient = new TezClient(tezConf);
     ApplicationId appId = tezClient.createApplication();
+    OrderedWordCount instance = new OrderedWordCount();
 
     FileSystem fs = FileSystem.get(conf);
 
@@ -338,6 +343,8 @@ public class OrderedWordCount {
     Path stagingDir = new Path(stagingDirStr);
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
     stagingDir = fs.makeQualified(stagingDir);
+    
+    TokenCache.obtainTokensForNamenodes(instance.credentials, new Path[] {stagingDir}, conf);
     TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
 
     tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
@@ -346,9 +353,12 @@ public class OrderedWordCount {
     // No need to add jar containing this class as assumed to be part of
     // the tez jars.
 
+    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
+    // is the same filesystem as the one used for Input/Output.
+    
     TezSession tezSession = null;
     AMConfiguration amConfig = new AMConfiguration(null,
-        null, tezConf, null);
+        null, tezConf, instance.credentials);
     if (useTezSession) {
       LOG.info("Creating Tez Session");
       TezSessionConfiguration sessionConfig =
@@ -393,7 +403,8 @@ public class OrderedWordCount {
 
         Map<String, LocalResource> localResources =
           new TreeMap<String, LocalResource>();
-        DAG dag = createDAG(fs, conf, localResources,
+        
+        DAG dag = instance.createDAG(fs, conf, localResources,
             stagingDir, dagIndex, inputPath, outputPath,
             generateSplitsInClient);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index da9ce55..31847b1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -116,7 +116,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
           }
         }
         inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
-            locationHints, splits.length);
+            locationHints, splits.length, null);
       } else {
         LOG.info("Grouping mapred api input splits");
         org.apache.hadoop.mapred.InputSplit[] splits = MRHelpers
@@ -142,7 +142,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
           }
         }
         inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
-            locationHints, splits.length);
+            locationHints, splits.length, null);
       }
     } else {
       inputSplitInfo = MRHelpers.generateInputSplitsToMem(conf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
index 0cf52bc..355c30f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 
@@ -72,4 +73,10 @@ public interface InputSplitInfo {
    * Get the {@link Type} of the InputSplitInfo
    */
   public abstract Type getType();
+  
+  /**
+   * Get {@link Credentials} which may be required to access the splits.
+   * @return
+   */
+  public abstract Credentials getCredentials();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
index 6bb3448..981c4d9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.hadoop;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 
@@ -40,20 +41,22 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 public class InputSplitInfoDisk implements InputSplitInfo {
 
   /// Splits file
-  private Path splitsFile;
+  private final Path splitsFile;
   /// Meta info file for all the splits information
-  private Path splitsMetaInfoFile;
+  private final Path splitsMetaInfoFile;
   /// Location hints to determine where to run the tasks
-  private List<TaskLocationHint> taskLocationHints;
+  private final List<TaskLocationHint> taskLocationHints;
   /// The num of tasks - same as number of splits generated.
-  private int numTasks;
+  private final int numTasks;
+  private final Credentials credentials;
 
   public InputSplitInfoDisk(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
-      List<TaskLocationHint> taskLocationHints) {
+      List<TaskLocationHint> taskLocationHints, Credentials credentials) {
     this.splitsFile = splitsFile;
     this.splitsMetaInfoFile = splitsMetaInfoFile;
     this.taskLocationHints = taskLocationHints;
     this.numTasks = numTasks;
+    this.credentials = credentials;
   }
 
   /* (non-Javadoc)
@@ -98,5 +101,10 @@ public class InputSplitInfoDisk implements InputSplitInfo {
     throw new UnsupportedOperationException("Not supported for Type: "
         + getType());
   }
+  
+  @Override
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
index e6be735..b77f668 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -21,6 +21,7 @@ package org.apache.tez.mapreduce.hadoop;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 
@@ -36,12 +37,14 @@ public class InputSplitInfoMem implements InputSplitInfo {
   private final MRSplitsProto splitsProto;
   private final List<TaskLocationHint> taskLocationHints;
   private final int numTasks;
+  private final Credentials credentials;
 
   public InputSplitInfoMem(MRSplitsProto splitsProto,
-      List<TaskLocationHint> taskLocationHints, int numTasks) {
+      List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials) {
     this.splitsProto = splitsProto;
     this.taskLocationHints = taskLocationHints;
     this.numTasks = numTasks;
+    this.credentials = credentials;
   }
 
   @Override
@@ -76,4 +79,8 @@ public class InputSplitInfoMem implements InputSplitInfo {
     return this.splitsProto;
   }
 
+  @Override
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44a7b72b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 90790b7..af1f9e6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -224,7 +225,7 @@ public class MRHelpers {
     return new InputSplitInfoDisk(
         JobSubmissionFiles.getJobSplitFile(inputSplitDir),
         JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
-        splits.length, locationHints);
+        splits.length, locationHints, jobContext.getCredentials());
   }
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -293,7 +294,7 @@ public class MRHelpers {
     return new InputSplitInfoDisk(
         JobSubmissionFiles.getJobSplitFile(inputSplitDir),
         JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
-        splits.length, locationHints);
+        splits.length, locationHints, jobConf.getCredentials());
   }
 
   /**
@@ -366,12 +367,12 @@ public class MRHelpers {
       Job job = Job.getInstance(conf);
       org.apache.hadoop.mapreduce.InputSplit[] splits = 
           generateNewSplits(job, null, 0);
-      splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()));
+      splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()), job.getCredentials());
     } else {
       LOG.info("Generating mapred api input splits");
       org.apache.hadoop.mapred.InputSplit[] splits = 
           generateOldSplits(jobConf, null, 0);
-      splitInfoMem = createSplitsProto(splits);
+      splitInfoMem = createSplitsProto(splits, jobConf.getCredentials());
     }
     LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
         + splitInfoMem.getSplitsProto().getSerializedSize());
@@ -380,7 +381,7 @@ public class MRHelpers {
 
   private static InputSplitInfoMem createSplitsProto(
       org.apache.hadoop.mapreduce.InputSplit[] newSplits,
-      SerializationFactory serializationFactory) throws IOException,
+      SerializationFactory serializationFactory, Credentials credentials) throws IOException,
       InterruptedException {
     MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
 
@@ -392,7 +393,7 @@ public class MRHelpers {
           .asList(newSplit.getLocations())), null));
     }
     return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
-        newSplits.length);
+        newSplits.length, credentials);
   }
 
   @Private
@@ -419,7 +420,7 @@ public class MRHelpers {
   }
 
   private static InputSplitInfoMem createSplitsProto(
-      org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
+      org.apache.hadoop.mapred.InputSplit[] oldSplits, Credentials credentials) throws IOException {
     MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
 
     List<TaskLocationHint> locationHints = Lists
@@ -430,7 +431,7 @@ public class MRHelpers {
           .asList(oldSplit.getLocations())), null));
     }
     return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
-        oldSplits.length);
+        oldSplits.length, credentials);
   }
 
   @Private