You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/18 04:23:38 UTC
git commit: TEZ-837. Remove numTasks from VertexLocationHints. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master e5ee79198 -> 8a4a62b76
TEZ-837. Remove numTasks from VertexLocationHints. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8a4a62b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8a4a62b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8a4a62b7
Branch: refs/heads/master
Commit: 8a4a62b763a0f91d8669a87303ba3dcee4d9c361
Parents: e5ee791
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 17 19:23:18 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 17 19:23:18 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 4 ++-
.../org/apache/tez/client/PreWarmContext.java | 29 ++++++++++++++------
.../apache/tez/dag/api/DagTypeConverters.java | 6 ++--
.../java/org/apache/tez/dag/api/Vertex.java | 2 +-
.../apache/tez/dag/api/VertexLocationHint.java | 20 --------------
tez-api/src/main/proto/DAGApiRecords.proto | 4 +--
.../org/apache/tez/dag/app/DAGAppMaster.java | 8 +++---
.../mapreduce/examples/OrderedWordCount.java | 4 +--
8 files changed, 35 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86931be..433db18 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,7 +14,9 @@ INCOMPATIBLE CHANGES
TEZ-827. Separate initialize and start operations on Inputs/Outputs.
- TEZ-668. Allow Processors to control Input/Output start
+ TEZ-668. Allow Processors to control Input/Output start.
+
+ TEZ-837. Remove numTasks from VertexLocationHints.
Release 0.2.0 - 2013-11-30
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java b/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
index 39eb20f..ddd117f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
+++ b/tez-api/src/main/java/org/apache/tez/client/PreWarmContext.java
@@ -37,6 +37,7 @@ public class PreWarmContext {
private final ProcessorDescriptor processorDescriptor;
private final Resource resource;
+ private final int numTasks;
private final VertexLocationHint locationHints;
private Map<String, LocalResource> localResources;
private Map<String, String> environment;
@@ -44,21 +45,27 @@ public class PreWarmContext {
/**
* Context to define how to pre-warm a TezSession.
- * @param processorDescriptor The processor to run within a Tez Task
- * after launching a container
- * @param resource The resource requirements for each container
- * @param locationHints The num of tasks to run as well as the location hints
- * for the containers to be launched.
- * The num of tasks can drive how many containers are launched.
- * However, as containers are re-used, the total number
- * of launched containers will likely be less than the
- * specified number of tasks.
+ *
+ * @param processorDescriptor
+ * The processor to run within a Tez Task after launching a container
+ * @param resource
+ * The resource requirements for each container
+ * @param numTasks
+ * The number of tasks to run. The num of tasks can drive how many
+ * containers are launched. However, as containers are re-used, the
+ * total number of launched containers will likely be less than the
+ * specified number of tasks.
+ * @param locationHints
+ * The location hints for the containers to be launched.
+ *
*/
public PreWarmContext(ProcessorDescriptor processorDescriptor,
Resource resource,
+ int numTasks,
VertexLocationHint locationHints) {
this.processorDescriptor = processorDescriptor;
this.resource = resource;
+ this.numTasks = numTasks;
this.locationHints = locationHints;
}
@@ -103,6 +110,10 @@ public class PreWarmContext {
return resource;
}
+ public int getNumTasks() {
+ return numTasks;
+ }
+
public VertexLocationHint getLocationHints() {
return locationHints;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index ed606c2..10e69bd 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -175,7 +175,7 @@ public class DagTypeConverters {
new HashSet<String>(inputHint.getRackList()));
outputList.add(outputHint);
}
- return new VertexLocationHint(outputList.size(), outputList);
+ return new VertexLocationHint(outputList);
}
// notes re HDFS URL handling:
@@ -553,14 +553,13 @@ public class DagTypeConverters {
outputList.add(outputHint);
}
- return new VertexLocationHint(proto.getNumTasks(), outputList);
+ return new VertexLocationHint(outputList);
}
public static VertexLocationHintProto convertVertexLocationHintToProto(
VertexLocationHint vertexLocationHint) {
VertexLocationHintProto.Builder builder =
VertexLocationHintProto.newBuilder();
- builder.setNumTasks(vertexLocationHint.getNumTasks());
if (vertexLocationHint.getTaskLocationHints() != null) {
for (TaskLocationHint taskLocationHint :
vertexLocationHint.getTaskLocationHints()) {
@@ -624,6 +623,7 @@ public class DagTypeConverters {
DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
proto.getProcessorDescriptor()),
Resource.newInstance(proto.getMemoryMb(), proto.getVirtualCores()),
+ proto.getNumTasks(),
vertexLocationHint);
if (proto.hasLocalResources()) {
context.setLocalResources(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 377f551..03f9d66 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -98,7 +98,7 @@ public class Vertex {
return this;
}
assert locations.size() == parallelism;
- taskLocationsHint = new VertexLocationHint(parallelism, locations);
+ taskLocationsHint = new VertexLocationHint(locations);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
index 56c55dc..8c0fe35 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -24,22 +24,10 @@ import java.util.Set;
public class VertexLocationHint {
- private final int numTasks;
private final List<TaskLocationHint> taskLocationHints;
- @Deprecated // TODO TEZ-837 Remove in follow up jira
- public VertexLocationHint(int numTasks,
- List<TaskLocationHint> taskLocationHints) {
- this.numTasks = numTasks;
- if (taskLocationHints != null) {
- this.taskLocationHints = Collections.unmodifiableList(taskLocationHints);
- } else {
- this.taskLocationHints = null;
- }
- }
public VertexLocationHint(List<TaskLocationHint> taskLocationHints) {
- this.numTasks = 0;
if (taskLocationHints != null) {
this.taskLocationHints = Collections.unmodifiableList(taskLocationHints);
} else {
@@ -47,10 +35,6 @@ public class VertexLocationHint {
}
}
- public int getNumTasks() {
- return numTasks;
- }
-
public List<TaskLocationHint> getTaskLocationHints() {
return taskLocationHints;
}
@@ -59,7 +43,6 @@ public class VertexLocationHint {
public int hashCode() {
final int prime = 7883;
int result = 1;
- result = prime * result + numTasks;
if (taskLocationHints != null) {
result = prime * result + taskLocationHints.hashCode();
}
@@ -78,9 +61,6 @@ public class VertexLocationHint {
return false;
}
VertexLocationHint other = (VertexLocationHint) obj;
- if (numTasks != other.numTasks) {
- return false;
- }
if (taskLocationHints != null) {
if (!taskLocationHints.equals(other.taskLocationHints)) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index b7a2c60..5faa7f1 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -236,8 +236,7 @@ enum StatusGetOptsProto {
}
message VertexLocationHintProto {
- optional int32 num_tasks = 1;
- repeated PlanTaskLocationHint task_location_hints = 2;
+ repeated PlanTaskLocationHint task_location_hints = 1;
}
message PreWarmContextProto {
@@ -248,4 +247,5 @@ message PreWarmContextProto {
optional PlanLocalResourcesProto localResources = 5;
repeated PlanKeyValuePair environmentSetting = 6;
optional string java_opts = 7;
+ optional int32 num_tasks = 8;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index f01e71d..9e2e377 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -870,15 +870,15 @@ public class DAGAppMaster extends AbstractService {
new org.apache.tez.dag.api.DAG(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX +
Integer.toString(dagCounter.get() + 1));
- if (preWarmContext.getLocationHints().getNumTasks() <= 0) {
+ if (preWarmContext.getNumTasks() <= 0) {
LOG.warn("Ignoring pre-warm context as invalid numContainers specified: "
- + preWarmContext.getLocationHints().getNumTasks());
+ + preWarmContext.getNumTasks());
return;
}
org.apache.tez.dag.api.Vertex preWarmVertex = new
org.apache.tez.dag.api.Vertex("PreWarmVertex",
preWarmContext.getProcessorDescriptor(),
- preWarmContext.getLocationHints().getNumTasks(), preWarmContext.getResource());
+ preWarmContext.getNumTasks(), preWarmContext.getResource());
if (preWarmContext.getEnvironment() != null) {
preWarmVertex.setTaskEnvironment(preWarmContext.getEnvironment());
}
@@ -895,7 +895,7 @@ public class DAGAppMaster extends AbstractService {
dag.addVertex(preWarmVertex);
LOG.info("Pre-warming containers"
+ ", processor=" + preWarmContext.getProcessorDescriptor().getClassName()
- + ", numContainers=" + preWarmContext.getLocationHints().getNumTasks()
+ + ", numContainers=" + preWarmContext.getNumTasks()
+ ", containerResource=" + preWarmContext.getResource());
startDAG(dag.createDag(amConf));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a4a62b7/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index c418bb2..a9349e5 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -426,7 +426,7 @@ public class OrderedWordCount {
if (doPreWarm) {
LOG.info("Pre-warming Session");
VertexLocationHint vertexLocationHint =
- new VertexLocationHint(preWarmNumContainers, null);
+ new VertexLocationHint(null);
ProcessorDescriptor sleepProcDescriptor =
new ProcessorDescriptor(SleepProcessor.class.getName());
SleepProcessor.SleepProcessorConfig sleepProcessorConfig =
@@ -434,7 +434,7 @@ public class OrderedWordCount {
sleepProcDescriptor.setUserPayload(
sleepProcessorConfig.toUserPayload());
PreWarmContext context = new PreWarmContext(sleepProcDescriptor,
- dag.getVertex("initialmap").getTaskResource(),
+ dag.getVertex("initialmap").getTaskResource(), preWarmNumContainers,
vertexLocationHint);
Map<String, LocalResource> contextLocalRsrcs =