You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/07 19:53:00 UTC
tez git commit: TEZ-3286. Allow clients to set processor reserved
memory per vertex (instead of per container). (hitesh)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 b06afabb9 -> 8eb707e4e
TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8eb707e4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8eb707e4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8eb707e4
Branch: refs/heads/branch-0.7
Commit: 8eb707e4e4ef8113d0265d21f1f0c0cb729e066f
Parents: b06afab
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jul 7 12:52:31 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jul 7 12:52:31 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 12 +--
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 6 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 14 ++-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 22 +++--
.../tez/dag/app/dag/impl/TestVertexImpl2.java | 18 +++-
.../tez/dag/app/rm/TestContainerReuse.java | 2 +-
.../tez/mapreduce/output/TestMROutput.java | 2 +-
.../tez/mapreduce/processor/MapUtils.java | 2 +-
.../processor/reduce/TestReduceProcessor.java | 2 +-
.../runtime/LogicalIOProcessorRuntimeTask.java | 6 ++
.../apache/tez/runtime/api/impl/TaskSpec.java | 53 ++++++++++--
.../apache/tez/runtime/task/TezTaskRunner.java | 17 +++-
.../TestLogicalIOProcessorRuntimeTask.java | 2 +-
.../tez/runtime/api/impl/TestTaskSpec.java | 91 ++++++++++++++++++++
.../tez/runtime/task/TestTezTaskRunner.java | 61 +++++++++++++
17 files changed, 288 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8696a59..4626e93 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).
TEZ-3223. Support a NullHistoryLogger to disable history logging if needed.
TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
TEZ-3305. TestAnalyzer fails on Hadoop 2.7.
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 3bbc0f9..a96d560 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -749,7 +749,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
+ "scale.memory.enabled";
@Private
@@ -760,7 +760,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX
+ "scale.memory.allocator.class";
@Private
@@ -773,7 +773,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
+ "scale.memory.reserve-fraction";
@Private
@@ -785,7 +785,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO =
TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io";
@@ -794,7 +794,7 @@ public class TezConfiguration extends Configuration {
/**
* Max cumulative total reservation for additional IOs.
*/
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX =
TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max";
/*
@@ -804,7 +804,7 @@ public class TezConfiguration extends Configuration {
*/
@Private
@Unstable
- @ConfigurationScope(Scope.AM)
+ @ConfigurationScope(Scope.VERTEX)
public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
TEZ_TASK_PREFIX + "scale.memory.ratios";
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6b98a7b..b7ad3b1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -194,7 +194,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private long cachedCountersTimestamp = 0;
private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
+ // Combined configs for the DAG
private final Configuration dagConf;
+ // DAG specific configs only
+ // Useful when trying to serialize only the diff from global configs
+ private final Configuration dagOnlyConf;
+
private final DAGPlan jobPlan;
private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
@@ -502,6 +507,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.dagId = dagId;
this.jobPlan = jobPlan;
this.dagConf = new Configuration(amConf);
+ this.dagOnlyConf = new Configuration(false);
Iterator<PlanKeyValuePair> iter =
jobPlan.getDagConf().getConfKeyValuesList().iterator();
// override the amConf by using DAG level configuration
@@ -509,6 +515,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
PlanKeyValuePair keyValPair = iter.next();
TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG);
this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+ this.dagOnlyConf.set(keyValPair.getKey(), keyValPair.getValue());
}
this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
this.userName = appUserName;
@@ -1649,7 +1656,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
dag.eventHandler, dag.taskAttemptListener,
dag.clock, dag.taskHeartbeatHandler,
!dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
- dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
+ dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker,
+ dag.dagOnlyConf);
return v;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4a0742f..2dfd7f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -830,9 +830,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
TaskSpec taskSpec = null;
if (baseTaskSpec != null) {
- taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+ taskSpec = new TaskSpec(attemptId,
+ baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
- baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+ baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs(),
+ baseTaskSpec.getTaskConf());
}
return new TaskAttemptImpl(attemptId, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0d6bc68..1e875d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -231,7 +231,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private long cachedCountersTimestamp = 0;
private Resource taskResource;
+ // Merged/combined vertex level config
private Configuration vertexConf;
+ // Vertex specific configs only ( include the dag specific configs too )
+ // Useful when trying to serialize only the diff from global configs
+ @VisibleForTesting
+ Configuration vertexOnlyConf;
private final boolean isSpeculationEnabled;
@@ -875,19 +880,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
TaskHeartbeatHandler thh, boolean commitVertexOutputs,
AppContext appContext, VertexLocationHint vertexLocationHint,
Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
- StateChangeNotifier entityStatusTracker) {
+ StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
this.vertexName = StringInterner.weakIntern(vertexName);
this.vertexConf = new Configuration(dagConf);
- // override dag configuration by using vertex's specified configuration
+ this.vertexOnlyConf = new Configuration(dagOnlyConf);
if (vertexPlan.hasVertexConf()) {
ConfigurationProto confProto = vertexPlan.getVertexConf();
for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
+ vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
}
}
+
+
this.clock = clock;
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
@@ -1538,7 +1546,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return TaskSpec.createBaseTaskSpec(getDAG().getName(),
getName(), getTotalTasks(), getProcessorDescriptor(),
getInputSpecList(taskIndex), getOutputSpecList(taskIndex),
- getGroupInputSpecList(taskIndex));
+ getGroupInputSpecList(taskIndex), vertexOnlyConf);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index f0a8625..520d10f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2258,6 +2258,8 @@ public class TestVertexImpl {
LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
vertices = new HashMap<String, VertexImpl>();
vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
+ Configuration dagConf = new Configuration(false);
+ dagConf.set("abc", "foobar");
for (int i = 0; i < vCnt; ++i) {
VertexPlan vPlan = dagPlan.getVertex(i);
String vName = vPlan.getName();
@@ -2269,17 +2271,18 @@ public class TestVertexImpl {
if (customInitializer == null) {
v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
- clock, thh, appContext, locationHint, dispatcher, updateTracker);
+ clock, thh, appContext, locationHint, dispatcher, updateTracker, dagConf);
} else {
v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
- clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
+ clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker,
+ dagConf);
}
} else {
v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, dagConf);
}
vertices.put(vName, v);
vertexIdMap.put(vertexId, v);
@@ -3120,6 +3123,7 @@ public class TestVertexImpl {
TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
Assert.assertEquals(mockLocation, event.getTaskLocationHint());
Assert.assertNotNull(event.getBaseTaskSpec());
+ Assert.assertEquals("foobar", event.getBaseTaskSpec().getTaskConf().get("abc"));
}
@Test(timeout = 5000)
@@ -5417,7 +5421,7 @@ public class TestVertexImpl {
VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
dispatcher.getEventHandler(), taskAttemptListener,
clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, new Configuration(false));
v.setInputVertices(new HashMap());
vertexIdMap.put(vId, v);
vertices.put(v.getName(), v);
@@ -5452,11 +5456,12 @@ public class TestVertexImpl {
VertexLocationHint vertexLocationHint,
DrainDispatcher dispatcher,
InputInitializer presetInitializer,
- StateChangeNotifier updateTracker) {
+ StateChangeNotifier updateTracker,
+ Configuration dagConf) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
taskAttemptListener, clock, thh, true,
appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, dagConf);
this.presetInitializer = presetInitializer;
}
@@ -5491,11 +5496,12 @@ public class TestVertexImpl {
AppContext appContext,
VertexLocationHint vertexLocationHint,
DrainDispatcher dispatcher,
- StateChangeNotifier updateTracker) {
+ StateChangeNotifier updateTracker,
+ Configuration dagConf) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
taskAttemptListener, clock, thh, true,
appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
- updateTracker);
+ updateTracker, dagConf);
this.dispatcher = dispatcher;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index b4064a0..483c3a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -39,6 +39,8 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
@@ -271,8 +273,14 @@ public class TestVertexImpl2 {
doReturn(new Credentials()).when(mockDag).getCredentials();
doReturn(mockDag).when(mockAppContext).getCurrentDAG();
+ ConfigurationProto confProto = ConfigurationProto.newBuilder()
+ .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo").setValue("bar").build())
+ .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo1").setValue("bar2").build())
+ .build();
+
vertexPlan = DAGProtos.VertexPlan.newBuilder()
.setName(vertexName)
+ .setVertexConf(confProto)
.setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder()
.setJavaOpts(initialJavaOpts)
.setNumTasks(numTasks)
@@ -286,12 +294,20 @@ public class TestVertexImpl2 {
.build())
.setType(DAGProtos.PlanVertexType.NORMAL).build();
+ Configuration dagConf = new Configuration(false);
+ dagConf.set("abc1", "xyz1");
+ dagConf.set("foo1", "bar1");
+
vertex =
new VertexImpl(TezVertexID.fromString("vertex_1418197758681_0001_1_00"), vertexPlan,
"testvertex", conf, mock(EventHandler.class), mock(TaskAttemptListener.class),
mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext,
VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null,
- new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class));
+ new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class), dagConf);
+
+ assertEquals("xyz1", vertex.vertexOnlyConf.get("abc1"));
+ assertEquals("bar2", vertex.vertexOnlyConf.get("foo1"));
+ assertEquals("bar", vertex.vertexOnlyConf.get("foo"));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 7ad044b..d944e5d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1455,7 +1455,7 @@ public class TestContainerReuse {
Collections.singletonList(new InputSpec("vertexName",
InputDescriptor.create("inputClassName"), 1)),
Collections.singletonList(new OutputSpec("vertexName",
- OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint,
+ OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint,
priority.getPriority(), containerContext);
return lr;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 45a24cb..bfeaad8 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -191,7 +191,7 @@ public class TestMROutput {
dagName, vertexName, -1,
procDesc,
inputSpecs,
- outputSpecs, null);
+ outputSpecs, null, null);
FileSystem fs = FileSystem.getLocal(conf);
Path workDir =
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8841882..0d0eace 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -212,7 +212,7 @@ public class MapUtils {
dagName, vertexName, -1,
mapProcessorDesc,
inputSpecs,
- outputSpecs, null);
+ outputSpecs, null, null);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index fcb42b3..21d2929 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -204,7 +204,7 @@ public class TestReduceProcessor {
reduceVertexName, -1,
reduceProcessorDesc,
Collections.singletonList(reduceInputSpec),
- Collections.singletonList(reduceOutputSpec), null);
+ Collections.singletonList(reduceOutputSpec), null, null);
Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 11ef31d..70e2a18 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -967,4 +967,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
return this.outputsMap;
}
+ @Private
+ @VisibleForTesting
+ public Configuration getTaskConf() {
+ return tezConf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 4dc57e2..e082bf8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -42,6 +44,7 @@ public class TaskSpec implements Writable {
private List<OutputSpec> outputSpecList;
private List<GroupInputSpec> groupInputSpecList;
private int vertexParallelism = -1;
+ private Configuration taskConf;
public TaskSpec() {
}
@@ -49,17 +52,27 @@ public class TaskSpec implements Writable {
public static TaskSpec createBaseTaskSpec(String dagName, String vertexName,
int vertexParallelism, ProcessorDescriptor processorDescriptor,
List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
- @Nullable List<GroupInputSpec> groupInputSpecList) {
+ @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
- outputSpecList, groupInputSpecList);
+ outputSpecList, groupInputSpecList, taskConf);
}
public TaskSpec(
String dagName, String vertexName,
int vertexParallelism,
ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
@Nullable List<GroupInputSpec> groupInputSpecList) {
+ this(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+ outputSpecList, groupInputSpecList, null);
+ }
+
+ public TaskSpec(
+ String dagName, String vertexName,
+ int vertexParallelism,
+ ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
checkNotNull(dagName, "dagName is null");
checkNotNull(vertexName, "vertexName is null");
checkNotNull(processorDescriptor, "processorDescriptor is null");
@@ -73,14 +86,25 @@ public class TaskSpec implements Writable {
this.outputSpecList = outputSpecList;
this.groupInputSpecList = groupInputSpecList;
this.vertexParallelism = vertexParallelism;
+ this.taskConf = taskConf;
+ }
+
+ public TaskSpec(TezTaskAttemptID taskAttemptID,
+ String dagName, String vertexName,
+ int vertexParallelism,
+ ProcessorDescriptor processorDescriptor,
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList) {
+ this(taskAttemptID, dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+ outputSpecList, groupInputSpecList, null);
}
public TaskSpec(TezTaskAttemptID taskAttemptID,
String dagName, String vertexName,
int vertexParallelism,
ProcessorDescriptor processorDescriptor,
- List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
- @Nullable List<GroupInputSpec> groupInputSpecList) {
+ List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+ @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
checkNotNull(taskAttemptID, "taskAttemptID is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(vertexName, "vertexName is null");
@@ -95,6 +119,7 @@ public class TaskSpec implements Writable {
this.outputSpecList = outputSpecList;
this.groupInputSpecList = groupInputSpecList;
this.vertexParallelism = vertexParallelism;
+ this.taskConf = taskConf;
}
public String getDAGName() {
@@ -129,6 +154,10 @@ public class TaskSpec implements Writable {
return groupInputSpecList;
}
+ public Configuration getTaskConf() {
+ return taskConf;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
taskAttemptId.write(out);
@@ -153,6 +182,12 @@ public class TaskSpec implements Writable {
} else {
out.writeBoolean(false);
}
+ if (taskConf != null) {
+ out.writeBoolean(true);
+ taskConf.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
}
@Override
@@ -188,6 +223,11 @@ public class TaskSpec implements Writable {
groupInputSpecList.add(group);
}
}
+ boolean hasVertexConf = in.readBoolean();
+ if (hasVertexConf) {
+ taskConf = new Configuration(false);
+ taskConf.readFields(in);
+ }
}
@Override
@@ -216,6 +256,9 @@ public class TaskSpec implements Writable {
}
sb.append("]");
}
+ if (taskConf != null) {
+ sb.append(", taskConfEntryCount=" + taskConf.size());
+ }
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index de83889..c4cae15 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -23,7 +23,9 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +46,7 @@ import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -53,11 +56,13 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner.class);
private final Configuration tezConf;
- private final LogicalIOProcessorRuntimeTask task;
+ @VisibleForTesting
+ final LogicalIOProcessorRuntimeTask task;
private final UserGroupInformation ugi;
private final TaskReporter taskReporter;
private final ListeningExecutorService executor;
+ final Configuration taskConf;
private volatile ListenableFuture<Void> taskFuture;
private volatile Thread waitingThread;
private volatile Throwable firstException;
@@ -78,7 +83,15 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
this.ugi = ugi;
this.taskReporter = taskReporter;
this.executor = executor;
- task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
+ this.taskConf = new Configuration(tezConf);
+ if (taskSpec.getTaskConf() != null) {
+ Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> entry = iter.next();
+ taskConf.set(entry.getKey(), entry.getValue());
+ }
+ }
+ task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, this,
serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
executionContext, memAvailable);
taskRunning = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 520ad69..08e909e 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -192,7 +192,7 @@ public class TestLogicalIOProcessorRuntimeTask {
ProcessorDescriptor processorDesc = createProcessorDescriptor();
TaskSpec taskSpec = new TaskSpec(taskAttemptID,
dagName, vertexName, parallelism, processorDesc,
- createInputSpecList(), createOutputSpecList(), null);
+ createInputSpecList(), createOutputSpecList(), null, null);
return taskSpec;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
new file mode 100644
index 0000000..f3cb49b
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTaskSpec {
+
+ @Test (timeout = 5000)
+ public void testSerDe() throws IOException {
+ ByteBuffer payload = null;
+ ProcessorDescriptor procDesc = ProcessorDescriptor.create("proc").setUserPayload(
+ UserPayload.create(payload)).setHistoryText("historyText");
+
+ List<InputSpec> inputSpecs = new ArrayList<InputSpec>();
+ InputSpec inputSpec = new InputSpec("src1", InputDescriptor.create("inputClass"),10);
+ inputSpecs.add(inputSpec);
+ List<OutputSpec> outputSpecs = new ArrayList<OutputSpec>();
+ OutputSpec outputSpec = new OutputSpec("dest1", OutputDescriptor.create("outputClass"), 999);
+ outputSpecs.add(outputSpec);
+ List<GroupInputSpec> groupInputSpecs = null;
+
+ Configuration taskConf = new Configuration(false);
+ taskConf.set("foo", "bar");
+
+ TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(
+ TezVertexID.getInstance(TezDAGID.getInstance("1234", 1, 1), 1), 1), 1);
+ TaskSpec taskSpec = new TaskSpec(taId, "dagName", "vName", -1, procDesc, inputSpecs, outputSpecs,
+ groupInputSpecs, taskConf);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(bos);
+ taskSpec.write(out);
+
+ TaskSpec deSerTaskSpec = new TaskSpec();
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ DataInput in = new DataInputStream(bis);
+ deSerTaskSpec.readFields(in);
+
+ Assert.assertEquals(taskSpec.getDAGName(), deSerTaskSpec.getDAGName());
+ Assert.assertEquals(taskSpec.getVertexName(), deSerTaskSpec.getVertexName());
+ Assert.assertEquals(taskSpec.getVertexParallelism(), deSerTaskSpec.getVertexParallelism());
+ Assert.assertEquals(taskSpec.getInputs().size(), deSerTaskSpec.getInputs().size());
+ Assert.assertEquals(taskSpec.getOutputs().size(), deSerTaskSpec.getOutputs().size());
+ Assert.assertNull(deSerTaskSpec.getGroupInputs());
+ Assert.assertEquals(taskSpec.getInputs().get(0).getSourceVertexName(),
+ deSerTaskSpec.getInputs().get(0).getSourceVertexName());
+ Assert.assertEquals(taskSpec.getOutputs().get(0).getDestinationVertexName(),
+ deSerTaskSpec.getOutputs().get(0).getDestinationVertexName());
+
+ Assert.assertEquals(taskConf.get("foo"), deSerTaskSpec.getTaskConf().get("foo"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java
new file mode 100644
index 0000000..c943e02
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTezTaskRunner {
+
+ @Test (timeout = 5000)
+ public void testTaskConfUsage() throws Exception {
+ Configuration conf = new Configuration(false);
+ conf.set("global", "global1");
+ conf.set("global_override", "global1");
+ String[] localDirs = null;
+ Configuration taskConf = new Configuration(false);
+ conf.set("global_override", "task1");
+ conf.set("task", "task1");
+
+ List<InputSpec> inputSpecList = new ArrayList<InputSpec>();
+ List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>();
+ TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class),
+ inputSpecList, outputSpecList, null, taskConf);
+ TezTaskRunner taskRunner = new TezTaskRunner(conf, mock(UserGroupInformation.class),
+ localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid",
+ null, 1000);
+
+ Assert.assertEquals("global1", taskRunner.task.getTaskConf().get("global"));
+ Assert.assertEquals("task1", taskRunner.task.getTaskConf().get("global_override"));
+ Assert.assertEquals("task1", taskRunner.task.getTaskConf().get("task"));
+ }
+
+
+}