You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:12 UTC
[03/50] [abbrv] tez git commit: TEZ-1666. UserPayload should be null
if the payload is not specified. (sseth)
TEZ-1666. UserPayload should be null if the payload is not specified.
(sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4e69bed5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4e69bed5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4e69bed5
Branch: refs/heads/TEZ-8
Commit: 4e69bed5c7dbc68d17f3d4648838e5acfc52c3eb
Parents: 1ffbc19
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 29 14:22:03 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 29 14:22:03 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/tez/dag/api/DagTypeConverters.java | 56 ++--
tez-api/src/main/proto/DAGApiRecords.proto | 8 +-
.../org/apache/tez/dag/api/TestDAGPlan.java | 18 +-
.../tez/dag/api/TestDagTypeConverters.java | 4 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 11 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 10 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 117 ++++----
.../tez/dag/app/dag/impl/TestVertexImpl.java | 268 ++++++++++---------
.../TestHistoryEventsProtoConversion.java | 3 +-
.../runtime/api/impl/TezInputContextImpl.java | 2 +-
.../api/impl/TezMergedInputContextImpl.java | 2 +-
.../runtime/api/impl/TezOutputContextImpl.java | 2 +-
.../api/impl/TezProcessorContextImpl.java | 2 +-
.../examples/BroadcastAndOneToOneExample.java | 3 +-
15 files changed, 275 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d2ccb9..933a445 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,9 @@ ALL CHANGES:
Release 0.5.2: Unreleased
INCOMPATIBLE CHANGES
+ TEZ-1666. UserPayload should be null if the payload is not specified.
+ 0.5.1 client cannot talk to 0.5.2 AMs (TEZ-1666 and TEZ-1664).
+ context.getUserPayload can now return null, apps may need to add defensive code.
ALL CHANGES:
TEZ-1620. Wait for application finish before stopping MiniTezCluster
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 179f3cc..17807d3 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -306,9 +306,13 @@ public class DagTypeConverters {
builder.setClassName(descriptor.getClassName());
UserPayload userPayload = descriptor.getUserPayload();
- if (userPayload != null && userPayload.hasPayload()) {
- builder.setUserPayload(ByteString.copyFrom(descriptor.getUserPayload().getPayload()));
- builder.setVersion(userPayload.getVersion());
+ if (userPayload != null) {
+ DAGProtos.TezUserPayloadProto.Builder payloadBuilder = DAGProtos.TezUserPayloadProto.newBuilder();
+ if (userPayload.hasPayload()) {
+ payloadBuilder.setUserPayload(ByteString.copyFrom(userPayload.getPayload()));
+ payloadBuilder.setVersion(userPayload.getVersion());
+ }
+ builder.setTezUserPayload(payloadBuilder.build());
}
if (descriptor.getHistoryText() != null) {
try {
@@ -348,62 +352,84 @@ public class DagTypeConverters {
private static UserPayload convertTezUserPayloadFromDAGPlan(
TezEntityDescriptorProto proto) {
UserPayload userPayload = null;
- if (proto.hasUserPayload()) {
- userPayload =
- UserPayload.create(proto.getUserPayload().asReadOnlyByteBuffer(), proto.getVersion());
- } else {
- userPayload = UserPayload.create(null, -1);
+ if (proto.hasTezUserPayload()) {
+ if (proto.getTezUserPayload().hasUserPayload()) {
+ userPayload =
+ UserPayload.create(proto.getTezUserPayload().getUserPayload().asReadOnlyByteBuffer(), proto.getTezUserPayload().getVersion());
+ } else {
+ userPayload = UserPayload.create(null);
+ }
}
return userPayload;
}
+ private static void setUserPayload(EntityDescriptor<?> entity, UserPayload payload) {
+ if (payload != null) {
+ entity.setUserPayload(payload);
+ }
+ }
+
public static InputDescriptor convertInputDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return InputDescriptor.create(className).setUserPayload(payload);
+ InputDescriptor id = InputDescriptor.create(className);
+ setUserPayload(id, payload);
+ return id;
}
public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return OutputDescriptor.create(className).setUserPayload(payload);
+ OutputDescriptor od = OutputDescriptor.create(className);
+ setUserPayload(od, payload);
+ return od;
}
public static InputInitializerDescriptor convertInputInitializerDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return InputInitializerDescriptor.create(className).setUserPayload(payload);
+ InputInitializerDescriptor iid = InputInitializerDescriptor.create(className);
+ setUserPayload(iid, payload);
+ return iid;
}
public static OutputCommitterDescriptor convertOutputCommitterDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return OutputCommitterDescriptor.create(className).setUserPayload(payload);
+ OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(className);
+ setUserPayload(ocd, payload);
+ return ocd;
}
public static VertexManagerPluginDescriptor convertVertexManagerPluginDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return VertexManagerPluginDescriptor.create(className).setUserPayload(payload);
+ VertexManagerPluginDescriptor vmpd = VertexManagerPluginDescriptor.create(className);
+ setUserPayload(vmpd, payload);
+ return vmpd;
}
public static EdgeManagerPluginDescriptor convertEdgeManagerPluginDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return EdgeManagerPluginDescriptor.create(className).setUserPayload(payload);
+ EdgeManagerPluginDescriptor empd = EdgeManagerPluginDescriptor.create(className);
+ setUserPayload(empd, payload);
+ return empd;
}
public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
TezEntityDescriptorProto proto) {
String className = proto.getClassName();
UserPayload payload = convertTezUserPayloadFromDAGPlan(proto);
- return ProcessorDescriptor.create(className).setUserPayload(payload);
+ ProcessorDescriptor pd = ProcessorDescriptor.create(className);
+ setUserPayload(pd, payload);
+ return pd;
}
public static TezAppMasterStatus convertTezSessionStatusFromProto(
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 04aa791..177faba 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -112,9 +112,13 @@ message PlanTaskConfiguration {
message TezEntityDescriptorProto {
optional string class_name = 1;
- optional bytes user_payload = 2;
+ optional TezUserPayloadProto tez_user_payload = 2;
optional bytes history_text = 3;
- optional int32 version = 4;
+}
+
+message TezUserPayloadProto {
+ optional bytes user_payload = 1;
+ optional int32 version = 2;
}
message RootInputLeafOutputProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 8cbd611..fccbb08 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -167,19 +167,19 @@ public class TestDAGPlan {
VertexPlan v2Proto = dagProto.getVertex(1);
EdgePlan edgeProto = dagProto.getEdge(0);
- assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
+ assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
- assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
+ assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
- assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
+ assertEquals("inputBytes", new String(edgeProto.getEdgeDestination().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("input", edgeProto.getEdgeDestination().getClassName());
- assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
+ assertEquals("outputBytes", new String(edgeProto.getEdgeSource().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("output", edgeProto.getEdgeSource().getClassName());
@@ -235,8 +235,8 @@ public class TestDAGPlan {
EdgePlan edgeProto = dagProto.getEdge(0);
// either v1 or v2 will be on top based on topological order
- String v1ProtoPayload = new String(v1Proto.getProcessorDescriptor().getUserPayload().toByteArray());
- String v2ProtoPayload = new String(v2Proto.getProcessorDescriptor().getUserPayload().toByteArray());
+ String v1ProtoPayload = new String(v1Proto.getProcessorDescriptor().getTezUserPayload().getUserPayload().toByteArray());
+ String v2ProtoPayload = new String(v2Proto.getProcessorDescriptor().getTezUserPayload().getUserPayload().toByteArray());
assertTrue(v1ProtoPayload.equals("processor1Bytes") || v1ProtoPayload.equals("processor3Bytes"));
assertTrue(v2ProtoPayload.equals("processor1Bytes") || v2ProtoPayload.equals("processor3Bytes"));
assertTrue(v1Proto.getProcessorDescriptor().getClassName().equals("processor1") ||
@@ -244,15 +244,15 @@ public class TestDAGPlan {
assertTrue(v2Proto.getProcessorDescriptor().getClassName().equals("processor1") ||
v2Proto.getProcessorDescriptor().getClassName().equals("processor3"));
- assertEquals("processor2Bytes", new String(v3Proto.getProcessorDescriptor()
+ assertEquals("processor2Bytes", new String(v3Proto.getProcessorDescriptor().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("processor2", v3Proto.getProcessorDescriptor().getClassName());
- assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
+ assertEquals("inputBytes", new String(edgeProto.getEdgeDestination().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("input", edgeProto.getEdgeDestination().getClassName());
- assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
+ assertEquals("outputBytes", new String(edgeProto.getEdgeSource().getTezUserPayload()
.getUserPayload().toByteArray()));
assertEquals("output", edgeProto.getEdgeSource().getClassName());
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index 9a1cb07..13347bb 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -37,8 +37,8 @@ public class TestDagTypeConverters {
.setHistoryText(historytext);
TezEntityDescriptorProto proto =
DagTypeConverters.convertToDAGPlan(entityDescriptor);
- Assert.assertEquals(payload.getVersion(), proto.getVersion());
- Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getUserPayload().toByteArray());
+ Assert.assertEquals(payload.getVersion(), proto.getTezUserPayload().getVersion());
+ Assert.assertArrayEquals(payload.deepCopyAsArray(), proto.getTezUserPayload().getUserPayload().toByteArray());
Assert.assertTrue(proto.hasHistoryText());
Assert.assertNotEquals(historytext, proto.getHistoryText());
Assert.assertEquals(historytext, new String(
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 57d742f..360a839 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -115,25 +115,22 @@ public class Edge {
private void createEdgeManager() {
switch (edgeProperty.getDataMovementType()) {
case ONE_TO_ONE:
- edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
+ edgeManagerContext = new EdgeManagerPluginContextImpl(null);
edgeManager = new OneToOneEdgeManager(edgeManagerContext);
break;
case BROADCAST:
- edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
+ edgeManagerContext = new EdgeManagerPluginContextImpl(null);
edgeManager = new BroadcastEdgeManager(edgeManagerContext);
break;
case SCATTER_GATHER:
- edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
+ edgeManagerContext = new EdgeManagerPluginContextImpl(null);
edgeManager = new ScatterGatherEdgeManager(edgeManagerContext);
break;
case CUSTOM:
if (edgeProperty.getEdgeManagerDescriptor() != null) {
UserPayload payload = null;
- if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null &&
- edgeProperty.getEdgeManagerDescriptor().getUserPayload().hasPayload()) {
+ if (edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null) {
payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload();
- } else {
- payload = UserPayload.create(null);
}
edgeManagerContext = new EdgeManagerPluginContextImpl(payload);
String edgeManagerClassName = edgeProperty.getEdgeManagerDescriptor().getClassName();
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 50e8b0c..b8e99d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -69,7 +69,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
@@ -2032,15 +2031,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Setting vertexManager to RootInputVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
- VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName())
- .setUserPayload(UserPayload.create(null)),
+ VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()),
this, appContext);
} else if (hasOneToOne && !hasCustom) {
LOG.info("Setting vertexManager to InputReadyVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
- VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName())
- .setUserPayload(UserPayload.create(null)),
+ VertexManagerPluginDescriptor.create(InputReadyVertexManager.class.getName()),
this, appContext);
} else if (hasBipartite && !hasCustom) {
LOG.info("Setting vertexManager to ShuffleVertexManager for "
@@ -2053,8 +2050,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
LOG.info("Setting vertexManager to ImmediateStartVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(
- VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName())
- .setUserPayload(UserPayload.create(null)),
+ VertexManagerPluginDescriptor.create(ImmediateStartVertexManager.class.getName()),
this, appContext);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index dc90a6f..af48c49 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -650,65 +650,67 @@ public class TestDAGImpl {
.setName("testverteximpl")
.addVertex(
VertexPlan.newBuilder()
- .setName("vertex1")
- .setType(PlanVertexType.NORMAL)
- .addTaskLocationHint(
- PlanTaskLocationHint.newBuilder()
- .addHost("host1")
- .addRack("rack1")
- .build()
+ .setName("vertex1")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host1")
+ .addRack("rack1")
+ .build()
)
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(1)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x1.y1")
- .build()
- )
- .addOutEdgeId("e1")
- .build()
- )
- .addVertex(
- VertexPlan.newBuilder()
- .setName("vertex2")
- .setType(PlanVertexType.NORMAL)
- .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
- .addTaskLocationHint(
- PlanTaskLocationHint.newBuilder()
- .addHost("host2")
- .addRack("rack2")
- .build()
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
)
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(2)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("foo")
- .setTaskModule("x2.y2")
+ .addOutEdgeId("e1")
.build()
- )
- .addInEdgeId("e1")
- .build()
- )
+ )
+ .addVertex(
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host2")
+ .addRack("rack2")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("foo")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .build()
+ )
.addEdge(
- EdgePlan.newBuilder()
- .setEdgeManager(TezEntityDescriptorProto.newBuilder()
- .setClassName(CustomizedEdgeManager.class.getName())
- .setUserPayload(ByteString.copyFromUtf8(exLocation.name()))
- )
- .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
- .setInputVertexName("vertex1")
- .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
- .setOutputVertexName("vertex2")
- .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
- .setId("e1")
- .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
- .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
- .build()
- )
+ EdgePlan.newBuilder()
+ .setEdgeManager(TezEntityDescriptorProto.newBuilder()
+ .setClassName(CustomizedEdgeManager.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFromUtf8(exLocation.name())))
+ )
+ .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+ .setInputVertexName("vertex1")
+ .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+ .setOutputVertexName("vertex2")
+ .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
+ .setId("e1")
+ .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+ .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+ .build()
+ )
.build();
return dag;
}
@@ -1135,14 +1137,15 @@ public class TestDAGImpl {
TezEntityDescriptorProto
.newBuilder()
.setClassName(CountingOutputCommitter.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
.setUserPayload(
ByteString
.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
- true, false, false).toUserPayload())).build())
+ true, false, false).toUserPayload())).build()))
.setName("output3")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class")
- )
+ )
.build());
badVertex.setAdditionalOutputs(outputs);
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 72dcd92..55ee05f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -219,7 +219,7 @@ public class TestVertexImpl {
@Override
public void initialize() throws IOException {
- if (getContext().getUserPayload().hasPayload()) {
+ if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) {
CountingOutputCommitterConfig conf =
new CountingOutputCommitterConfig(getContext().getUserPayload());
this.throwError = conf.throwError;
@@ -419,8 +419,9 @@ public class TestVertexImpl {
)
.addOutEdgeId("e1")
.setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
- .setClassName(VertexManagerWithException.class.getName())
- .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes())))
+ .setClassName(VertexManagerWithException.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes()))))
.build()
)
.addVertex(
@@ -438,8 +439,9 @@ public class TestVertexImpl {
)
.addInEdgeId("e1")
.setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
- .setClassName(VertexManagerWithException.class.getName())
- .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes())))
+ .setClassName(VertexManagerWithException.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(exLocation.name().getBytes()))))
.build()
)
.addEdge(
@@ -706,24 +708,24 @@ public class TestVertexImpl {
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
initializerClassName))
- .setName("input1")
- .setIODescriptor(
- TezEntityDescriptorProto.newBuilder()
- .setClassName("InputClazz")
- .build()
- ).build()
+ .setName("input1")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ ).build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x1.y1")
- .build()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x1.y1")
+ .build()
)
.addOutEdgeId("e1")
- .build()
+ .build()
)
.addVertex(
VertexPlan.newBuilder()
@@ -737,11 +739,11 @@ public class TestVertexImpl {
.setName("input2")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder()
- .setClassName("InputClazz")
- .build()
+ .setClassName("InputClazz")
+ .build()
)
.build()
- )
+ )
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(-1)
@@ -752,7 +754,7 @@ public class TestVertexImpl {
.build()
)
.addInEdgeId("e1")
- .build()
+ .build()
)
.addVertex(
VertexPlan.newBuilder()
@@ -766,8 +768,8 @@ public class TestVertexImpl {
.setName("input3")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder()
- .setClassName("InputClazz")
- .build()
+ .setClassName("InputClazz")
+ .build()
)
.build()
)
@@ -782,40 +784,42 @@ public class TestVertexImpl {
)
.setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
.setClassName(RootInputSpecUpdaterVertexManager.class.getName())
- .setUserPayload(ByteString.copyFrom(new byte[] {0})))
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(new byte[]{0}))))
.build()
)
.addVertex(
- VertexPlan.newBuilder()
- .setName("vertex4")
- .setType(PlanVertexType.NORMAL)
- .addInputs(
- RootInputLeafOutputProto.newBuilder()
- .setControllerDescriptor(
- TezEntityDescriptorProto.newBuilder().setClassName(
- initializerClassName))
- .setName("input4")
- .setIODescriptor(
- TezEntityDescriptorProto.newBuilder()
- .setClassName("InputClazz")
- .build()
+ VertexPlan.newBuilder()
+ .setName("vertex4")
+ .setType(PlanVertexType.NORMAL)
+ .addInputs(
+ RootInputLeafOutputProto.newBuilder()
+ .setControllerDescriptor(
+ TezEntityDescriptorProto.newBuilder().setClassName(
+ initializerClassName))
+ .setName("input4")
+ .setIODescriptor(
+ TezEntityDescriptorProto.newBuilder()
+ .setClassName("InputClazz")
+ .build()
+ )
+ .build()
)
- .build()
- )
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(-1)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x3.y3")
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(-1)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
+ .setClassName(RootInputSpecUpdaterVertexManager.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(new byte[]{1}))))
.build()
)
- .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder()
- .setClassName(RootInputSpecUpdaterVertexManager.class.getName())
- .setUserPayload(ByteString.copyFrom(new byte[] {1})))
- .build()
- )
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
@@ -827,7 +831,7 @@ public class TestVertexImpl {
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
- )
+ )
.build();
return dag;
}
@@ -1404,71 +1408,71 @@ public class TestVertexImpl {
.build()
)
.addVertex(
- VertexPlan.newBuilder()
- .setName("vertex2")
- .setType(PlanVertexType.NORMAL)
- .addTaskLocationHint(
- PlanTaskLocationHint.newBuilder()
- .addHost("host2")
- .addRack("rack2")
- .build()
- )
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(2)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x2.y2")
+ VertexPlan.newBuilder()
+ .setName("vertex2")
+ .setType(PlanVertexType.NORMAL)
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host2")
+ .addRack("rack2")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x2.y2")
+ .build()
+ )
+ .addOutEdgeId("e2")
.build()
- )
- .addOutEdgeId("e2")
- .build()
)
.addVertex(
- VertexPlan.newBuilder()
- .setName("vertex3")
- .setType(PlanVertexType.NORMAL)
- .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
- .addTaskLocationHint(
- PlanTaskLocationHint.newBuilder()
- .addHost("host3")
- .addRack("rack3")
- .build()
- )
- .setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(2)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("foo")
- .setTaskModule("x3.y3")
+ VertexPlan.newBuilder()
+ .setName("vertex3")
+ .setType(PlanVertexType.NORMAL)
+ .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
+ .addTaskLocationHint(
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host3")
+ .addRack("rack3")
+ .build()
+ )
+ .setTaskConfig(
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("foo")
+ .setTaskModule("x3.y3")
+ .build()
+ )
+ .addInEdgeId("e1")
+ .addInEdgeId("e2")
+ .addOutEdgeId("e3")
+ .addOutEdgeId("e4")
.build()
- )
- .addInEdgeId("e1")
- .addInEdgeId("e2")
- .addOutEdgeId("e3")
- .addOutEdgeId("e4")
- .build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex4")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
- PlanTaskLocationHint.newBuilder()
- .addHost("host4")
- .addRack("rack4")
- .build()
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host4")
+ .addRack("rack4")
+ .build()
)
.setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(2)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x4.y4")
- .build()
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x4.y4")
+ .build()
)
.addInEdgeId("e3")
.addOutEdgeId("e5")
@@ -1479,19 +1483,19 @@ public class TestVertexImpl {
.setName("vertex5")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
- PlanTaskLocationHint.newBuilder()
- .addHost("host5")
- .addRack("rack5")
- .build()
+ PlanTaskLocationHint.newBuilder()
+ .addHost("host5")
+ .addRack("rack5")
+ .build()
)
.setTaskConfig(
- PlanTaskConfiguration.newBuilder()
- .setNumTasks(2)
- .setVirtualCores(4)
- .setMemoryMb(1024)
- .setJavaOpts("")
- .setTaskModule("x5.y5")
- .build()
+ PlanTaskConfiguration.newBuilder()
+ .setNumTasks(2)
+ .setVirtualCores(4)
+ .setMemoryMb(1024)
+ .setJavaOpts("")
+ .setTaskModule("x5.y5")
+ .build()
)
.addInEdgeId("e4")
.addOutEdgeId("e6")
@@ -1575,9 +1579,10 @@ public class TestVertexImpl {
.setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
.setEdgeManager(
TezEntityDescriptorProto.newBuilder()
- .setClassName(EdgeManagerForTest.class.getName())
- .setUserPayload(ByteString.copyFrom(edgePayload))
- .build())
+ .setClassName(EdgeManagerForTest.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload((ByteString.copyFrom(edgePayload))))
+ .build())
.setId("e4")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
@@ -2833,9 +2838,11 @@ public class TestVertexImpl {
outputs.add(RootInputLeafOutputProto.newBuilder()
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
- CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
- new CountingOutputCommitter.CountingOutputCommitterConfig()
- .toUserPayload())).build())
+ CountingOutputCommitter.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(
+ new CountingOutputCommitter.CountingOutputCommitterConfig()
+ .toUserPayload())).build()))
.setName("output_v2")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
@@ -2946,9 +2953,11 @@ public class TestVertexImpl {
outputs.add(RootInputLeafOutputProto.newBuilder()
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
- CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
+ CountingOutputCommitter.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(
new CountingOutputCommitter.CountingOutputCommitterConfig(
- true, true, false).toUserPayload())).build())
+ true, true, false).toUserPayload())).build()))
.setName("output_v2")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
@@ -2990,9 +2999,11 @@ public class TestVertexImpl {
outputs.add(RootInputLeafOutputProto.newBuilder()
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
- CountingOutputCommitter.class.getName()).setUserPayload(ByteString.copyFrom(
+ CountingOutputCommitter.class.getName())
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(
new CountingOutputCommitter.CountingOutputCommitterConfig(
- true, true, true).toUserPayload())).build())
+ true, true, true).toUserPayload())).build()))
.setName("output_v2")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class"))
@@ -4162,7 +4173,8 @@ public class TestVertexImpl {
.setEdgeManager(
TezEntityDescriptorProto.newBuilder()
.setClassName(EdgeManagerForTest.class.getName())
- .setUserPayload(ByteString.copyFrom(edgePayload))
+ .setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
+ .setUserPayload(ByteString.copyFrom(edgePayload)))
.build())
.setOutputVertexName("M5")
.setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 903b4fe..ad508b6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -325,8 +325,7 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getSourceEdgeManagers().size());
Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
- Assert.assertNotNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
- Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload().getPayload());
+ Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(),
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 420c477..62311e9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -76,7 +76,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
checkNotNull(sourceVertexName, "sourceVertexName is null");
checkNotNull(inputs, "input map is null");
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
- this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+ this.userPayload = userPayload;
this.inputIndex = inputIndex;
this.sourceVertexName = sourceVertexName;
this.sourceInfo = new EventMetaData(
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index 693bde8..e71a0bc 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -46,7 +46,7 @@ public class TezMergedInputContextImpl implements MergedInputContext {
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
this.groupInputName = groupInputName;
this.groupInputsMap = groupInputsMap;
- this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+ this.userPayload = userPayload;
this.inputReadyTracker = inputReadyTracker;
this.workDirs = workDirs;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 94df6aa..7b53075 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -69,7 +69,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
auxServiceEnv, memDist, outputDescriptor, objectRegistry);
checkNotNull(outputIndex, "outputIndex is null");
checkNotNull(destinationVertexName, "destinationVertexName is null");
- this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+ this.userPayload = userPayload;
this.outputIndex = outputIndex;
this.destinationVertexName = destinationVertexName;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index 330c4c9..9b4dc6a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -62,7 +62,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
auxServiceEnv, memDist, processorDescriptor, objectRegistry);
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
- this.userPayload = userPayload == null ? UserPayload.create(null) : userPayload;
+ this.userPayload = userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
this.inputReadyTracker = inputReadyTracker;
http://git-wip-us.apache.org/repos/asf/tez/blob/4e69bed5/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e8a137c..42b31f1 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -72,7 +72,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
.next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
- ByteBuffer userPayload = getContext().getUserPayload().getPayload();
+ ByteBuffer userPayload =
+ getContext().getUserPayload() == null ? null : getContext().getUserPayload().getPayload();
if (userPayload != null) {
boolean doLocalityCheck = getContext().getUserPayload().getPayload().get(0) > 0 ? true : false;
if (doLocalityCheck) {