You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/12 23:19:31 UTC
git commit: TEZ-432. Make JobToken available to inputs / outputs
(part of TEZ-398). (sseth)
Updated Branches:
refs/heads/TEZ-398 3d64024d9 -> b8a66679a
TEZ-432. Make JobToken available to inputs / outputs (part of TEZ-398).
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b8a66679
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b8a66679
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b8a66679
Branch: refs/heads/TEZ-398
Commit: b8a66679a30ee924236856ec8f96275a73ca2c0b
Parents: 3d64024
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Sep 12 14:19:02 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Sep 12 14:19:02 2013 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/YarnTezDagChild.java | 8 +++---
.../tez/engine/common/shuffle/impl/Fetcher.java | 8 ++----
.../tez/engine/common/shuffle/impl/Shuffle.java | 11 +++++++-
.../engine/lib/output/OnFileSortedOutput.java | 2 +-
.../engine/newapi/impl/TezInputContextImpl.java | 6 +++--
.../newapi/impl/TezOutputContextImpl.java | 7 +++--
.../newapi/impl/TezProcessorContextImpl.java | 7 +++--
.../engine/newapi/impl/TezTaskContextImpl.java | 9 ++++---
.../LogicalIOProcessorRuntimeTask.java | 27 ++++++++++++++++----
.../tez/engine/shuffle/common/ShuffleUtils.java | 11 +++++++-
.../tez/mapreduce/newinput/SimpleInput.java | 6 +++--
11 files changed, 73 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 5f72982..66227b0 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -396,7 +396,7 @@ public class YarnTezDagChild {
currentVertexId = newVertexId;
updateLoggers(currentTaskAttemptID);
currentTask = createLogicalTask(
- taskSpec, defaultConf, tezUmbilical);
+ taskSpec, defaultConf, tezUmbilical, jobToken);
} finally {
taskLock.writeLock().unlock();
}
@@ -537,8 +537,8 @@ public class YarnTezDagChild {
}
private static LogicalIOProcessorRuntimeTask createLogicalTask(
- TaskSpec taskSpec, Configuration conf,
- TezUmbilical tezUmbilical) throws IOException {
+ TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical,
+ Token<JobTokenIdentifier> jobToken) throws IOException {
// FIXME TODONEWTEZ
// conf.setBoolean("ipc.client.tcpnodelay", true);
@@ -564,7 +564,7 @@ public class YarnTezDagChild {
new OutputSpec("null", simpleOutputDesc, 0));
}
return new LogicalIOProcessorRuntimeTask(taskSpec, conf,
- tezUmbilical);
+ tezUmbilical, jobToken);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 86e5b56..0440236 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -49,10 +49,8 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.engine.common.security.SecureShuffleUtils;
import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
import org.apache.tez.engine.common.sort.impl.IFileInputStream;
import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -98,16 +96,14 @@ class Fetcher extends Thread {
public Fetcher(Configuration job,
ShuffleScheduler scheduler, MergeManager merger,
ShuffleClientMetrics metrics,
- Shuffle shuffle, TezInputContext inputContext) throws IOException {
+ Shuffle shuffle, SecretKey jobTokenSecret, TezInputContext inputContext) throws IOException {
this.job = job;
this.scheduler = scheduler;
this.merger = merger;
this.metrics = metrics;
this.shuffle = shuffle;
this.id = ++nextId;
- this.jobTokenSecret = ShuffleUtils
- .getJobTokenSecretFromTokenBytes(inputContext
- .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+ this.jobTokenSecret = jobTokenSecret;
ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 9dd213e..7c41d3d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -36,9 +38,11 @@ import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
import com.google.common.base.Preconditions;
@@ -60,6 +64,7 @@ public class Shuffle implements ExceptionReporter {
private String throwingThreadName = null;
private final int numInputs;
private final AtomicInteger reduceStartId;
+ private final SecretKey jobTokenSecret;
private AtomicInteger reduceRange = new AtomicInteger(
TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
@@ -75,6 +80,10 @@ public class Shuffle implements ExceptionReporter {
this.numInputs = numInputs;
+ this.jobTokenSecret = ShuffleUtils
+ .getJobTokenSecretFromTokenBytes(inputContext
+ .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+
FileSystem localFS = FileSystem.getLocal(this.conf);
LocalDirAllocator localDirAllocator =
new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
@@ -178,7 +187,7 @@ public class Shuffle implements ExceptionReporter {
TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
Fetcher[] fetchers = new Fetcher[numFetchers];
for (int i = 0; i < numFetchers; ++i) {
- fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, inputContext);
+ fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, inputContext);
fetchers[i].start();
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index 29a4b02..7e57a62 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -98,7 +98,7 @@ public class OnFileSortedOutput implements LogicalOutput {
.toString());
ByteBuffer shuffleMetadata = outputContext
.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
- int shufflePort = ShuffleUtils.deserializeShuffleMetaData(shuffleMetadata);
+ int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
.newBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index fff2090..72a36d9 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -18,8 +18,10 @@
package org.apache.tez.engine.newapi.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +44,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
TezUmbilical tezUmbilical, String taskVertexName,
String sourceVertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters, byte[] userPayload,
- RuntimeTask runtimeTask) {
+ RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
- tezUmbilical);
+ tezUmbilical, serviceConsumerMetadata);
this.userPayload = userPayload;
this.sourceVertexName = sourceVertexName;
this.sourceInfo = new EventMetaData(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index a0695cc..ba48b71 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -18,8 +18,10 @@
package org.apache.tez.engine.newapi.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +44,10 @@ public class TezOutputContextImpl extends TezTaskContextImpl
TezUmbilical tezUmbilical, String taskVertexName,
String destinationVertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload, RuntimeTask runtimeTask) {
+ byte[] userPayload, RuntimeTask runtimeTask,
+ Map<String, ByteBuffer> serviceConsumerMetadata) {
super(conf, taskVertexName, taskAttemptID, counters, runtimeTask,
- tezUmbilical);
+ tezUmbilical, serviceConsumerMetadata);
this.userPayload = userPayload;
this.destinationVertexName = destinationVertexName;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 4ec55d0..da17468 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -18,8 +18,10 @@
package org.apache.tez.engine.newapi.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.counters.TezCounters;
@@ -38,9 +40,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
public TezProcessorContextImpl(Configuration tezConf,
TezUmbilical tezUmbilical, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
- byte[] userPayload, RuntimeTask runtimeTask) {
+ byte[] userPayload, RuntimeTask runtimeTask,
+ Map<String, ByteBuffer> serviceConsumerMetadata) {
super(tezConf, vertexName, taskAttemptID, counters, runtimeTask,
- tezUmbilical);
+ tezUmbilical, serviceConsumerMetadata);
this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 2925d05..7a592ae 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.engine.newapi.impl;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -42,12 +43,13 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
protected String uniqueIdentifier;
protected final RuntimeTask runtimeTask;
protected final TezUmbilical tezUmbilical;
+ private final Map<String, ByteBuffer> serviceConsumerMetadata;
@Private
public TezTaskContextImpl(Configuration conf,
String taskVertexName, TezTaskAttemptID taskAttemptID,
TezCounters counters, RuntimeTask runtimeTask,
- TezUmbilical tezUmbilical) {
+ TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
this.conf = conf;
this.taskVertexName = taskVertexName;
this.taskAttemptID = taskAttemptID;
@@ -57,6 +59,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
this.runtimeTask = runtimeTask;
this.tezUmbilical = tezUmbilical;
+ this.serviceConsumerMetadata = serviceConsumerMetadata;
}
@Override
@@ -105,8 +108,8 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
@Override
public ByteBuffer getServiceConsumerMetaData(String serviceName) {
- // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
- return null;
+ return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
+ .asReadOnlyBuffer().rewind();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 05a28d8..b03d3e1 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -19,8 +19,10 @@
package org.apache.tez.engine.newruntime;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -29,9 +31,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.newapi.Event;
import org.apache.tez.engine.newapi.Input;
import org.apache.tez.engine.newapi.LogicalIOProcessor;
@@ -50,6 +54,7 @@ import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
import com.google.common.base.Preconditions;
@@ -73,6 +78,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private final LogicalIOProcessor processor;
private final TezCounters tezCounters;
+
+ private final Map<String, ByteBuffer> serviceConsumerMetadata;
private Map<String, LogicalInput> inputMap;
private Map<String, LogicalOutput> outputMap;
@@ -82,9 +89,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
private Map<String, List<Event>> closeInputEventMap;
private Map<String, List<Event>> closeOutputEventMap;
+
+
- public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf,
- TezUmbilical tezUmbilical) {
+ public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
+ Configuration tezConf, TezUmbilical tezUmbilical,
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ // TODO Remove jobToken from here post TEZ-421
LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
+ taskSpec);
this.taskSpec = taskSpec;
@@ -97,6 +108,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
this.processorDescriptor = taskSpec.getProcessorDescriptor();
this.processor = createProcessor(processorDescriptor);
this.tezCounters = new TezCounters();
+ this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
+ ShuffleUtils.convertJobTokenToBytes(jobToken));
this.state = State.NEW;
}
@@ -219,7 +233,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
TezInputContext inputContext = new TezInputContextImpl(tezConf,
tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
taskSpec.getTaskAttemptID(), tezCounters,
- inputSpec.getInputDescriptor().getUserPayload(), this);
+ inputSpec.getInputDescriptor().getUserPayload(), this,
+ serviceConsumerMetadata);
return inputContext;
}
@@ -228,14 +243,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
tezUmbilical, taskSpec.getVertexName(),
outputSpec.getDestinationVertexName(),
taskSpec.getTaskAttemptID(), tezCounters,
- outputSpec.getOutputDescriptor().getUserPayload(), this);
+ outputSpec.getOutputDescriptor().getUserPayload(), this,
+ serviceConsumerMetadata);
return outputContext;
}
private TezProcessorContext createProcessorContext() {
TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
- tezCounters, processorDescriptor.getUserPayload(), this);
+ tezCounters, processorDescriptor.getUserPayload(), this,
+ serviceConsumerMetadata);
return processorContext;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
index 3a6b2e4..2326a79 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import javax.crypto.SecretKey;
import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.engine.common.security.JobTokenIdentifier;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
@@ -42,7 +43,15 @@ public class ShuffleUtils {
return sk;
}
- public static int deserializeShuffleMetaData(ByteBuffer meta)
+ public static ByteBuffer convertJobTokenToBytes(
+ Token<JobTokenIdentifier> jobToken) throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ jobToken.write(dob);
+ ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ return bb;
+ }
+
+ public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
throws IOException {
DataInputByteBuffer in = new DataInputByteBuffer();
try {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b8a66679/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
index 4e4006a..bb39480 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -173,12 +173,14 @@ public class SimpleInput implements LogicalInput {
Object key;
Object value;
+ private final boolean localNewApi = useNewApi;
+
@SuppressWarnings("unchecked")
@Override
public boolean next() throws IOException {
boolean hasNext = false;
long bytesInPrev = getInputBytes();
- if (useNewApi) {
+ if (localNewApi) {
try {
hasNext = newRecordReader.nextKeyValue();
} catch (InterruptedException e) {
@@ -201,7 +203,7 @@ public class SimpleInput implements LogicalInput {
@Override
public KVRecord getCurrentKV() throws IOException {
KVRecord kvRecord = null;
- if (useNewApi) {
+ if (localNewApi) {
try {
valueIterator.setValue(newRecordReader.getCurrentValue());
kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);