You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/30 23:30:16 UTC
tez git commit: TEZ-3286. Allow clients to set processor reserved
memory per vertex (instead of per container). Contributed by Hitesh Shah.
Repository: tez
Updated Branches:
refs/heads/master 71bb2defe -> 3b08cbf90
TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). Contributed by Hitesh Shah.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b08cbf9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b08cbf9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b08cbf9
Branch: refs/heads/master
Commit: 3b08cbf907784de463c9e3c05147b5c6d681251d
Parents: 71bb2de
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 30 16:30:01 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 30 16:30:01 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/tez/dag/api/TezConfiguration.java | 12 +--
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 3 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 14 ++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 22 +++--
.../tez/dag/app/dag/impl/TestVertexImpl2.java | 28 +++++-
.../tez/dag/app/rm/TestContainerReuse.java | 2 +-
.../org/apache/tez/util/ProtoConverters.java | 31 ++++++-
.../src/test/proto/TezDaemonProtocol.proto | 1 +
.../tez/mapreduce/output/TestMROutput.java | 2 +-
.../tez/mapreduce/processor/MapUtils.java | 2 +-
.../processor/reduce/TestReduceProcessor.java | 2 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 6 ++
.../apache/tez/runtime/api/impl/TaskSpec.java | 53 ++++++++++--
.../apache/tez/runtime/task/TezTaskRunner2.java | 13 ++-
.../TestLogicalIOProcessorRuntimeTask.java | 2 +-
.../tez/runtime/api/impl/TestTaskSpec.java | 91 ++++++++++++++++++++
.../tez/runtime/task/TestTaskExecution2.java | 2 +-
.../tez/runtime/task/TestTezTaskRunner2.java | 65 ++++++++++++++
20 files changed, 326 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebc4e79..9a5904f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).
TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
TEZ-3314. Double counting input bytes in MultiMRInput.
TEZ-3308. Add counters to capture input split length.
@@ -70,6 +71,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).
TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
TEZ-3314. Double counting input bytes in MultiMRInput.
TEZ-3308. Add counters to capture input split length.
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4118bb5..936c5db 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -807,7 +807,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty(type="boolean")
public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
+ "scale.memory.enabled";
@@ -819,7 +819,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX
+ "scale.memory.allocator.class";
@@ -833,7 +833,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty(type="double")
public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
+ "scale.memory.reserve-fraction";
@@ -846,7 +846,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty(type="float")
public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO =
TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io";
@@ -856,7 +856,7 @@ public class TezConfiguration extends Configuration {
/**
* Max cumulative total reservation for additional IOs.
*/
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty(type="float")
public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX =
TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max";
@@ -867,7 +867,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
TEZ_TASK_PREFIX + "scale.memory.ratios";
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index a6c6c02..fd6d446 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -198,7 +198,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private long cachedCountersTimestamp = 0;
private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
+ // Combined configs for the DAG
private final Configuration dagConf;
+ // DAG specific configs only
+ // Useful when trying to serialize only the diff from global configs
+ private final Configuration dagOnlyConf;
+
private final DAGPlan jobPlan;
private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
@@ -499,6 +504,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.dagId = dagId;
this.jobPlan = jobPlan;
this.dagConf = new Configuration(amConf);
+ this.dagOnlyConf = new Configuration(false);
Iterator<PlanKeyValuePair> iter =
jobPlan.getDagConf().getConfKeyValuesList().iterator();
// override the amConf by using DAG level configuration
@@ -506,6 +512,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
PlanKeyValuePair keyValPair = iter.next();
TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG);
this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+ this.dagOnlyConf.set(keyValPair.getKey(), keyValPair.getValue());
}
this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
this.userName = appUserName;
@@ -1626,7 +1633,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.eventHandler, dag.taskCommunicatorManagerInterface,
dag.clock, dag.taskHeartbeatHandler,
!dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
- dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
+ dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker,
+ dag.dagOnlyConf);
return v;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 28a1c5e..ec7db61 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -723,7 +723,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskSpec taskSpec = new TaskSpec(attemptId,
baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
- baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+ baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs(),
+ baseTaskSpec.getTaskConf());
return new TaskAttemptImpl(attemptId, eventHandler,
taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
(failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6b79e98..01bca8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -236,7 +236,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private long cachedCountersTimestamp = 0;
private Resource taskResource;
+ // Merged/combined vertex level config
private Configuration vertexConf;
+ // Vertex specific configs only ( include the dag specific configs too )
+ // Useful when trying to serialize only the diff from global configs
+ @VisibleForTesting
+ Configuration vertexOnlyConf;
private final boolean isSpeculationEnabled;
@@ -854,19 +859,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
TaskHeartbeatHandler thh, boolean commitVertexOutputs,
AppContext appContext, VertexLocationHint vertexLocationHint,
Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
- StateChangeNotifier entityStatusTracker) {
+ StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
this.vertexName = StringInterner.weakIntern(vertexName);
this.vertexConf = new Configuration(dagConf);
- // override dag configuration by using vertex's specified configuration
+ this.vertexOnlyConf = new Configuration(dagOnlyConf);
if (vertexPlan.hasVertexConf()) {
ConfigurationProto confProto = vertexPlan.getVertexConf();
for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
+ vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
}
}
+
+
this.clock = clock;
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
@@ -1567,7 +1575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return TaskSpec.createBaseTaskSpec(getDAG().getName(),
getName(), getTotalTasks(), getProcessorDescriptor(),
getInputSpecList(taskIndex), getOutputSpecList(taskIndex),
- getGroupInputSpecList(taskIndex));
+ getGroupInputSpecList(taskIndex), vertexOnlyConf);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6b30a24..d165272 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2261,6 +2261,8 @@ public class TestVertexImpl {
LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
vertices = new HashMap<String, VertexImpl>();
vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
+ Configuration dagConf = new Configuration(false);
+ dagConf.set("abc", "foobar");
for (int i = 0; i < vCnt; ++i) {
VertexPlan vPlan = dagPlan.getVertex(i);
String vName = vPlan.getName();
@@ -2272,17 +2274,18 @@ public class TestVertexImpl {
if (customInitializer == null) {
v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
- clock, thh, appContext, locationHint, dispatcher, updateTracker);
+ clock, thh, appContext, locationHint, dispatcher, updateTracker, dagConf);
} else {
v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
- clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
+ clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker,
+ dagConf);
}
} else {
v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, dagConf);
}
vertices.put(vName, v);
vertexIdMap.put(vertexId, v);
@@ -3126,6 +3129,7 @@ public class TestVertexImpl {
TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
Assert.assertEquals(mockLocation, event.getTaskLocationHint());
Assert.assertNotNull(event.getBaseTaskSpec());
+ Assert.assertEquals("foobar", event.getBaseTaskSpec().getTaskConf().get("abc"));
}
@Test(timeout = 5000)
@@ -5422,7 +5426,7 @@ public class TestVertexImpl {
VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, new Configuration(false));
v.setInputVertices(new HashMap());
vertexIdMap.put(vId, v);
vertices.put(v.getName(), v);
@@ -5457,11 +5461,12 @@ public class TestVertexImpl {
VertexLocationHint vertexLocationHint,
DrainDispatcher dispatcher,
InputInitializer presetInitializer,
- StateChangeNotifier updateTracker) {
+ StateChangeNotifier updateTracker,
+ Configuration dagConf) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
taskCommunicatorManagerInterface, clock, thh, true,
appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, dagConf);
this.presetInitializer = presetInitializer;
}
@@ -5496,11 +5501,12 @@ public class TestVertexImpl {
AppContext appContext,
VertexLocationHint vertexLocationHint,
DrainDispatcher dispatcher,
- StateChangeNotifier updateTracker) {
+ StateChangeNotifier updateTracker,
+ Configuration dagConf) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
taskCommunicatorManagerInterface, clock, thh, true,
appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, dagConf);
this.dispatcher = dispatcher;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 8bd288a..b1976c3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -49,6 +49,8 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
@@ -60,6 +62,7 @@ import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -433,13 +436,18 @@ public class TestVertexImpl2 {
ExecutionContextTestInfoHolder vertexInfo) {
VertexPlan vertexPlan = createVertexPlanForExeuctionContextTests(vertexInfo);
VertexWrapper vertexWrapper =
- new VertexWrapper(vertexInfo.appContext, vertexPlan, new Configuration(false));
+ new VertexWrapper(vertexInfo.appContext, vertexPlan, new Configuration(false), true);
return vertexWrapper;
}
private VertexPlan createVertexPlanForExeuctionContextTests(ExecutionContextTestInfoHolder info) {
+ ConfigurationProto confProto = ConfigurationProto.newBuilder()
+ .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo").setValue("bar").build())
+ .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo1").setValue("bar2").build())
+ .build();
VertexPlan.Builder vertexPlanBuilder = VertexPlan.newBuilder()
.setName(info.vertexName)
+ .setVertexConf(confProto)
.setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder()
.setNumTasks(10)
.setJavaOpts("dontcare")
@@ -502,7 +510,8 @@ public class TestVertexImpl2 {
final VertexImpl vertex;
final VertexPlan vertexPlan;
- VertexWrapper(AppContext appContext, VertexPlan vertexPlan, Configuration conf) {
+ VertexWrapper(AppContext appContext, VertexPlan vertexPlan, Configuration conf,
+ boolean checkVertexOnlyConf) {
if (appContext == null) {
mockAppContext = createDefaultMockAppContext();
DAG mockDag = mock(DAG.class);
@@ -512,6 +521,9 @@ public class TestVertexImpl2 {
mockAppContext = appContext;
}
+ Configuration dagConf = new Configuration(false);
+ dagConf.set("abc1", "xyz1");
+ dagConf.set("foo1", "bar1");
this.vertexPlan = vertexPlan;
@@ -520,11 +532,19 @@ public class TestVertexImpl2 {
"testvertex", conf, mock(EventHandler.class), mock(TaskCommunicatorManagerInterface.class),
mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext,
VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null,
- new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class));
+ new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class),
+ dagConf);
+
+ if (checkVertexOnlyConf) {
+ Assert.assertEquals("xyz1", vertex.vertexOnlyConf.get("abc1"));
+ Assert.assertEquals("bar2", vertex.vertexOnlyConf.get("foo1"));
+ Assert.assertEquals("bar", vertex.vertexOnlyConf.get("foo"));
+ }
+
}
VertexWrapper(VertexPlan vertexPlan, Configuration conf) {
- this(null, vertexPlan, conf);
+ this(null, vertexPlan, conf, false);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 99c85ab..a45f092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1417,7 +1417,7 @@ public class TestContainerReuse {
Collections.singletonList(new InputSpec("vertexName",
InputDescriptor.create("inputClassName"), 1)),
Collections.singletonList(new OutputSpec("vertexName",
- OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint,
+ OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint,
priority.getPriority(), containerContext, 0, 0, 0);
return lr;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
index 60ebc53..25d61d0 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
@@ -15,12 +15,18 @@
package org.apache.tez.util;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
@@ -29,6 +35,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto.Builder;
public class ProtoConverters {
@@ -65,15 +72,24 @@ public class ProtoConverters {
}
}
+ Configuration taskConf = null;
+ if (taskSpecProto.hasTaskConf()) {
+ taskConf = new Configuration(false);
+ Map<String, String> confMap =
+ DagTypeConverters.convertConfFromProto(taskSpecProto.getTaskConf());
+ for (Entry<String, String> e : confMap.entrySet()) {
+ taskConf.set(e.getKey(), e.getValue());
+ }
+ }
TaskSpec taskSpec =
new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(),
taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
- outputSpecList, groupInputSpecs);
+ outputSpecList, groupInputSpecs, taskConf);
return taskSpec;
}
public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
- TaskSpecProto.Builder builder = TaskSpecProto.newBuilder();
+ Builder builder = TaskSpecProto.newBuilder();
builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
builder.setDagName(taskSpec.getDAGName());
builder.setVertexName(taskSpec.getVertexName());
@@ -102,6 +118,17 @@ public class ProtoConverters {
}
}
+ if (taskSpec.getTaskConf() != null) {
+ ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder();
+ Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ confBuilder.addConfKeyValues(PlanKeyValuePair.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue()).build());
+ }
+ builder.setTaskConf(confBuilder.build());
+ }
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
index 2f8b2e6..a01a299 100644
--- a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
+++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
@@ -44,6 +44,7 @@ message TaskSpecProto {
repeated IOSpecProto output_specs = 6;
repeated GroupInputSpecProto grouped_input_specs = 7;
optional int32 vertex_parallelism = 8;
+ optional ConfigurationProto task_conf = 9;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 0c1dc66..05bcd98 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -192,7 +192,7 @@ public class TestMROutput {
dagName, vertexName, -1,
procDesc,
inputSpecs,
- outputSpecs, null);
+ outputSpecs, null, null);
FileSystem fs = FileSystem.getLocal(conf);
Path workDir =
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 133ef9e..8309966 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -213,7 +213,7 @@ public class MapUtils {
dagName, vertexName, -1,
mapProcessorDesc,
inputSpecs,
- outputSpecs, null);
+ outputSpecs, null, null);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 382bc0e..1922c53 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -205,7 +205,7 @@ public class TestReduceProcessor {
reduceVertexName, -1,
reduceProcessorDesc,
Collections.singletonList(reduceInputSpec),
- Collections.singletonList(reduceOutputSpec), null);
+ Collections.singletonList(reduceOutputSpec), null, null);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 0863e65..e49791f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -1054,4 +1054,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
public HadoopShim getHadoopShim() {
return hadoopShim;
}
+
+ @Private
+ @VisibleForTesting
+ public Configuration getTaskConf() {
+ return tezConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 7fce1d4..78bb1e9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -42,6 +44,7 @@ public class TaskSpec implements Writable {
private List<OutputSpec> outputSpecList;
private List<GroupInputSpec> groupInputSpecList;
private int vertexParallelism = -1;
+ private Configuration taskConf;
public TaskSpec() {
}
@@ -49,17 +52,27 @@ public class TaskSpec implements Writable {
public static TaskSpec createBaseTaskSpec(String dagName, String vertexName,
int vertexParallelism, ProcessorDescriptor processorDescriptor,
List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
- @Nullable List<GroupInputSpec> groupInputSpecList) {
+ @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
- outputSpecList, groupInputSpecList);
+ outputSpecList, groupInputSpecList, taskConf);
}
public TaskSpec(
String dagName, String vertexName,
int vertexParallelism,
ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
@Nullable List<GroupInputSpec> groupInputSpecList) {
+ this(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+ outputSpecList, groupInputSpecList, null);
+ }
+
+ public TaskSpec(
+ String dagName, String vertexName,
+ int vertexParallelism,
+ ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
checkNotNull(dagName, "dagName is null");
checkNotNull(vertexName, "vertexName is null");
checkNotNull(processorDescriptor, "processorDescriptor is null");
@@ -73,14 +86,25 @@ public class TaskSpec implements Writable {
this.outputSpecList = outputSpecList;
this.groupInputSpecList = groupInputSpecList;
this.vertexParallelism = vertexParallelism;
+ this.taskConf = taskConf;
+ }
+
+ public TaskSpec(TezTaskAttemptID taskAttemptID,
+ String dagName, String vertexName,
+ int vertexParallelism,
+ ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList) {
+ this(taskAttemptID, dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+ outputSpecList, groupInputSpecList, null);
}
public TaskSpec(TezTaskAttemptID taskAttemptID,
String dagName, String vertexName,
int vertexParallelism,
ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
- @Nullable List<GroupInputSpec> groupInputSpecList) {
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
checkNotNull(taskAttemptID, "taskAttemptID is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(vertexName, "vertexName is null");
@@ -95,6 +119,7 @@ public class TaskSpec implements Writable {
this.outputSpecList = outputSpecList;
this.groupInputSpecList = groupInputSpecList;
this.vertexParallelism = vertexParallelism;
+ this.taskConf = taskConf;
}
public String getDAGName() {
@@ -133,6 +158,10 @@ public class TaskSpec implements Writable {
return groupInputSpecList;
}
+ public Configuration getTaskConf() {
+ return taskConf;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
taskAttemptId.write(out);
@@ -157,6 +186,12 @@ public class TaskSpec implements Writable {
} else {
out.writeBoolean(false);
}
+ if (taskConf != null) {
+ out.writeBoolean(true);
+ taskConf.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
}
@Override
@@ -192,6 +227,11 @@ public class TaskSpec implements Writable {
groupInputSpecList.add(group);
}
}
+ boolean hasVertexConf = in.readBoolean();
+ if (hasVertexConf) {
+ taskConf = new Configuration(false);
+ taskConf.readFields(in);
+ }
}
@Override
@@ -220,6 +260,9 @@ public class TaskSpec implements Writable {
}
sb.append("]");
}
+ if (taskConf != null) {
+ sb.append(", taskConfEntryCount=" + taskConf.size());
+ }
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index afa08e7..96f8474 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -17,7 +17,9 @@ package org.apache.tez.runtime.task;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -93,6 +95,7 @@ public class TezTaskRunner2 {
private final Condition oobSignalCondition = oobSignalLock.newCondition();
private volatile long taskKillStartTime = 0;
+ final Configuration taskConf;
private final HadoopShim hadoopShim;
@@ -114,7 +117,15 @@ public class TezTaskRunner2 {
this.executor = executor;
this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
this.hadoopShim = hadoopShim;
- this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs,
+ this.taskConf = new Configuration(tezConf);
+ if (taskSpec.getTaskConf() != null) {
+ Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ taskConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs,
umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 00e830f..ecfc424 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -190,7 +190,7 @@ public class TestLogicalIOProcessorRuntimeTask {
ProcessorDescriptor processorDesc = createProcessorDescriptor();
TaskSpec taskSpec = new TaskSpec(taskAttemptID,
dagName, vertexName, parallelism, processorDesc,
- createInputSpecList(), createOutputSpecList(), null);
+ createInputSpecList(), createOutputSpecList(), null, null);
return taskSpec;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
new file mode 100644
index 0000000..dfe19f8
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTaskSpec {
+
+ @Test (timeout = 5000)
+ public void testSerDe() throws IOException {
+ ByteBuffer payload = null;
+ ProcessorDescriptor procDesc = ProcessorDescriptor.create("proc").setUserPayload(
+ UserPayload.create(payload)).setHistoryText("historyText");
+
+ List<InputSpec> inputSpecs = new ArrayList<>();
+ InputSpec inputSpec = new InputSpec("src1", InputDescriptor.create("inputClass"),10);
+ inputSpecs.add(inputSpec);
+ List<OutputSpec> outputSpecs = new ArrayList<>();
+ OutputSpec outputSpec = new OutputSpec("dest1", OutputDescriptor.create("outputClass"), 999);
+ outputSpecs.add(outputSpec);
+ List<GroupInputSpec> groupInputSpecs = null;
+
+ Configuration taskConf = new Configuration(false);
+ taskConf.set("foo", "bar");
+
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(
+ TezVertexID.getInstance(TezDAGID.getInstance("1234", 1, 1), 1), 1), 1);
+ TaskSpec taskSpec = new TaskSpec(taId, "dagName", "vName", -1, procDesc, inputSpecs, outputSpecs,
+ groupInputSpecs, taskConf);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(bos);
+ taskSpec.write(out);
+
+ TaskSpec deSerTaskSpec = new TaskSpec();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ DataInput in = new DataInputStream(bis);
+ deSerTaskSpec.readFields(in);
+
+ Assert.assertEquals(taskSpec.getDAGName(), deSerTaskSpec.getDAGName());
+ Assert.assertEquals(taskSpec.getVertexName(), deSerTaskSpec.getVertexName());
+ Assert.assertEquals(taskSpec.getVertexParallelism(), deSerTaskSpec.getVertexParallelism());
+ Assert.assertEquals(taskSpec.getInputs().size(), deSerTaskSpec.getInputs().size());
+ Assert.assertEquals(taskSpec.getOutputs().size(), deSerTaskSpec.getOutputs().size());
+ Assert.assertNull(deSerTaskSpec.getGroupInputs());
+ Assert.assertEquals(taskSpec.getInputs().get(0).getSourceVertexName(),
+ deSerTaskSpec.getInputs().get(0).getSourceVertexName());
+ Assert.assertEquals(taskSpec.getOutputs().get(0).getDestinationVertexName(),
+ deSerTaskSpec.getOutputs().get(0).getDestinationVertexName());
+
+ Assert.assertEquals(taskConf.get("foo"), deSerTaskSpec.getTaskConf().get("foo"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index c3b9abd..6cb49fa 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -740,7 +740,7 @@ public class TestTaskExecution2 {
.setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
TaskSpec taskSpec =
new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
- new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
+ new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null);
TezTaskRunner2 taskRunner;
if (testRunner) {
http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
new file mode 100644
index 0000000..f58421a
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestTezTaskRunner2 {
+
+ @Test (timeout = 5000)
+ public void testTaskConfUsage() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set("global", "global1");
+ conf.set("global_override", "global1");
+ String[] localDirs = null;
+ Configuration taskConf = new Configuration(false);
+ conf.set("global_override", "task1");
+ conf.set("task", "task1");
+
+ List<InputSpec> inputSpecList = new ArrayList<>();
+ List<OutputSpec> outputSpecList = new ArrayList<>();
+ TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class),
+ inputSpecList, outputSpecList, null, taskConf);
+ TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class),
+ localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid",
+ null, 1000, false, new DefaultHadoopShim());
+
+ Assert.assertEquals("global1", taskRunner2.task.getTaskConf().get("global"));
+ Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("global_override"));
+ Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("task"));
+ }
+
+
+}