You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/12 23:19:31 UTC

git commit: TEZ-432. Make JobToken available to inputs / outputs (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 3d64024d9 -> b8a66679a


TEZ-432. Make JobToken available to inputs / outputs (part of TEZ-398).
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b8a66679
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b8a66679
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b8a66679

Branch: refs/heads/TEZ-398
Commit: b8a66679a30ee924236856ec8f96275a73ca2c0b
Parents: 3d64024
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Sep 12 14:19:02 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Sep 12 14:19:02 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  8 +++---
 .../tez/engine/common/shuffle/impl/Fetcher.java |  8 ++----
 .../tez/engine/common/shuffle/impl/Shuffle.java | 11 +++++++-
 .../engine/lib/output/OnFileSortedOutput.java   |  2 +-
 .../engine/newapi/impl/TezInputContextImpl.java |  6 +++--
 .../newapi/impl/TezOutputContextImpl.java       |  7 +++--
 .../newapi/impl/TezProcessorContextImpl.java    |  7 +++--
 .../engine/newapi/impl/TezTaskContextImpl.java  |  9 ++++---
 .../LogicalIOProcessorRuntimeTask.java          | 27 ++++++++++++++++----
 .../tez/engine/shuffle/common/ShuffleUtils.java | 11 +++++++-
 .../tez/mapreduce/newinput/SimpleInput.java     |  6 +++--
 11 files changed, 73 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 5f72982..66227b0 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -396,7 +396,7 @@ public class YarnTezDagChild {
           currentVertexId = newVertexId;
           updateLoggers(currentTaskAttemptID);
           currentTask = createLogicalTask(
-              taskSpec, defaultConf, tezUmbilical);
+              taskSpec, defaultConf, tezUmbilical, jobToken);
         } finally {
           taskLock.writeLock().unlock();
         }
@@ -537,8 +537,8 @@ public class YarnTezDagChild {
   }
 
   private static LogicalIOProcessorRuntimeTask createLogicalTask(
-      TaskSpec taskSpec, Configuration conf,
-      TezUmbilical tezUmbilical) throws IOException {
+      TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
+      Token<JobTokenIdentifier> jobToken) throws IOException {
 
     // FIXME TODONEWTEZ
     // conf.setBoolean("ipc.client.tcpnodelay", true);
@@ -564,7 +564,7 @@ public class YarnTezDagChild {
           new OutputSpec("null", simpleOutputDesc, 0));
     }
     return new LogicalIOProcessorRuntimeTask(taskSpec, conf,
-        tezUmbilical);
+        tezUmbilical, jobToken);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 86e5b56..0440236 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -49,10 +49,8 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
 import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.common.sort.impl.IFileInputStream;
 import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -98,16 +96,14 @@ class Fetcher extends Thread {
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
-      Shuffle shuffle, TezInputContext inputContext) throws IOException {
+      Shuffle shuffle, SecretKey jobTokenSecret, TezInputContext inputContext) throws IOException {
     this.job = job;
     this.scheduler = scheduler;
     this.merger = merger;
     this.metrics = metrics;
     this.shuffle = shuffle;
     this.id = ++nextId;
-    this.jobTokenSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+    this.jobTokenSecret = jobTokenSecret;
     ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
     wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 9dd213e..7c41d3d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -36,9 +38,11 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -60,6 +64,7 @@ public class Shuffle implements ExceptionReporter {
   private String throwingThreadName = null;
   private final int numInputs;
   private final AtomicInteger reduceStartId;
+  private final SecretKey jobTokenSecret;
   private AtomicInteger reduceRange = new AtomicInteger(
       TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
   
@@ -75,6 +80,10 @@ public class Shuffle implements ExceptionReporter {
             
     this.numInputs = numInputs;
     
+    this.jobTokenSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+    
     FileSystem localFS = FileSystem.getLocal(this.conf);
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
@@ -178,7 +187,7 @@ public class Shuffle implements ExceptionReporter {
               TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
       Fetcher[] fetchers = new Fetcher[numFetchers];
       for (int i = 0; i < numFetchers; ++i) {
-        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, inputContext);
+        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, inputContext);
         fetchers[i].start();
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index 29a4b02..7e57a62 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -98,7 +98,7 @@ public class OnFileSortedOutput implements LogicalOutput {
         .toString());
     ByteBuffer shuffleMetadata = outputContext
         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
-    int shufflePort = ShuffleUtils.deserializeShuffleMetaData(shuffleMetadata);
+    int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
 
     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
         .newBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index fff2090..72a36d9 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -42,9 +44,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
       TezUmbilical tezUmbilical, String taskVertexName,
       String sourceVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, byte[] userPayload,
-      RuntimeTask runtimeTask) {
+      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
     super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
-        tezUmbilical);
+        tezUmbilical, serviceConsumerMetadata);
     this.userPayload = userPayload;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index a0695cc..ba48b71 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -42,9 +44,10 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       TezUmbilical tezUmbilical, String taskVertexName,
       String destinationVertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload, RuntimeTask runtimeTask) {
+      byte[] userPayload, RuntimeTask runtimeTask,
+      Map<String, ByteBuffer> serviceConsumerMetadata) {
     super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
-        tezUmbilical);
+        tezUmbilical, serviceConsumerMetadata);
     this.userPayload = userPayload;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 4ec55d0..da17468 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
@@ -38,9 +40,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
   public TezProcessorContextImpl(Configuration tezConf,
       TezUmbilical tezUmbilical, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload, RuntimeTask runtimeTask) {
+      byte[] userPayload, RuntimeTask runtimeTask,
+      Map<String, ByteBuffer> serviceConsumerMetadata) {
     super(tezConf, vertexName, taskAttemptID, counters, runtimeTask,
-        tezUmbilical);
+        tezUmbilical, serviceConsumerMetadata);
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 2925d05..7a592ae 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.engine.newapi.impl;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -42,12 +43,13 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   protected String uniqueIdentifier;
   protected final RuntimeTask runtimeTask;
   protected final TezUmbilical tezUmbilical;
+  private final Map<String, ByteBuffer> serviceConsumerMetadata;
 
   @Private
   public TezTaskContextImpl(Configuration conf,
       String taskVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, RuntimeTask runtimeTask,
-      TezUmbilical tezUmbilical) {
+      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
     this.conf = conf;
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
@@ -57,6 +59,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
     this.runtimeTask = runtimeTask;
     this.tezUmbilical = tezUmbilical;
+    this.serviceConsumerMetadata = serviceConsumerMetadata;
   }
 
   @Override
@@ -105,8 +108,8 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
 
   @Override
   public ByteBuffer getServiceConsumerMetaData(String serviceName) {
-    // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
-    return null;
+    return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
+        .asReadOnlyBuffer().rewind();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 05a28d8..b03d3e1 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -19,8 +19,10 @@
 package org.apache.tez.engine.newruntime;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +31,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.Input;
 import org.apache.tez.engine.newapi.LogicalIOProcessor;
@@ -50,6 +54,7 @@ import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
 import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
 import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
 import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -73,6 +78,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final LogicalIOProcessor processor;
 
   private final TezCounters tezCounters;
+  
+  private final Map<String, ByteBuffer> serviceConsumerMetadata;
 
   private Map<String, LogicalInput> inputMap;
   private Map<String, LogicalOutput> outputMap;
@@ -82,9 +89,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private Map<String, List<Event>> closeInputEventMap;
   private Map<String, List<Event>> closeOutputEventMap;
+  
+  
 
-  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf,
-      TezUmbilical tezUmbilical) {
+  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
+      Configuration tezConf, TezUmbilical tezUmbilical,
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    // TODO Remove jobToken from here post TEZ-421
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
         + taskSpec);
     this.taskSpec = taskSpec;
@@ -97,6 +108,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
     this.processor = createProcessor(processorDescriptor);
     this.tezCounters = new TezCounters();
+    this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+    this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+        ShuffleUtils.convertJobTokenToBytes(jobToken));
     this.state = State.NEW;
   }
 
@@ -219,7 +233,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
         tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
         taskSpec.getTaskAttemptID(), tezCounters,
-        inputSpec.getInputDescriptor().getUserPayload(), this);
+        inputSpec.getInputDescriptor().getUserPayload(), this,
+        serviceConsumerMetadata);
     return inputContext;
   }
 
@@ -228,14 +243,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         tezUmbilical, taskSpec.getVertexName(),
         outputSpec.getDestinationVertexName(),
         taskSpec.getTaskAttemptID(), tezCounters,
-        outputSpec.getOutputDescriptor().getUserPayload(), this);
+        outputSpec.getOutputDescriptor().getUserPayload(), this,
+        serviceConsumerMetadata);
     return outputContext;
   }
 
   private TezProcessorContext createProcessorContext() {
     TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
         tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
-        tezCounters, processorDescriptor.getUserPayload(), this);
+        tezCounters, processorDescriptor.getUserPayload(), this,
+        serviceConsumerMetadata);
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
index 3a6b2e4..2326a79 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import javax.crypto.SecretKey;
 
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
@@ -42,7 +43,15 @@ public class ShuffleUtils {
     return sk;
   }
 
-  public static int deserializeShuffleMetaData(ByteBuffer meta)
+  public static ByteBuffer convertJobTokenToBytes(
+      Token<JobTokenIdentifier> jobToken) throws IOException {
+    DataOutputBuffer dob = new DataOutputBuffer();
+    jobToken.write(dob);
+    ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return bb;
+  }
+
+  public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
       throws IOException {
     DataInputByteBuffer in = new DataInputByteBuffer();
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
index 4e4006a..bb39480 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -173,12 +173,14 @@ public class SimpleInput implements LogicalInput {
       Object key;
       Object value;
       
+      private final boolean localNewApi = useNewApi;
+      
       @SuppressWarnings("unchecked")
       @Override
       public boolean next() throws IOException {
         boolean hasNext = false;
         long bytesInPrev = getInputBytes();
-        if (useNewApi) {
+        if (localNewApi) {
           try {
             hasNext = newRecordReader.nextKeyValue();
           } catch (InterruptedException e) {
@@ -201,7 +203,7 @@ public class SimpleInput implements LogicalInput {
       @Override
       public KVRecord getCurrentKV() throws IOException {
         KVRecord kvRecord = null;
-        if (useNewApi) {
+        if (localNewApi) {
           try {
             valueIterator.setValue(newRecordReader.getCurrentValue());
             kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);