You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/09 23:00:12 UTC
git commit: TEZ-1318. Simplify Vertex Constructor (bikas)
Repository: tez
Updated Branches:
refs/heads/master a1dd82912 -> f184b1a0e
TEZ-1318. Simplify Vertex Constructor (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f184b1a0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f184b1a0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f184b1a0
Branch: refs/heads/master
Commit: f184b1a0e183b70fb3de3282782ab9d5b8ca4c9a
Parents: a1dd829
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Aug 9 13:59:46 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat Aug 9 13:59:46 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/client/AMConfiguration.java | 2 -
.../org/apache/tez/client/TezClientUtils.java | 2 +-
.../org/apache/tez/common/TezYARNUtils.java | 4 --
.../main/java/org/apache/tez/dag/api/DAG.java | 9 ++-
.../apache/tez/dag/api/TezConfiguration.java | 11 +++-
.../java/org/apache/tez/dag/api/Vertex.java | 64 ++++++++++++++++++--
.../apache/tez/examples/OrderedWordCount.java | 13 ++--
.../examples/BroadcastAndOneToOneExample.java | 13 +---
.../mapreduce/examples/FilterLinesByWord.java | 8 +--
.../examples/FilterLinesByWordOneToOne.java | 5 +-
.../mapreduce/examples/IntersectDataGen.java | 3 +-
.../mapreduce/examples/IntersectExample.java | 8 +--
.../mapreduce/examples/IntersectValidate.java | 23 +++----
.../tez/mapreduce/examples/MRRSleepJob.java | 9 +--
.../examples/TestOrderedWordCount.java | 9 +--
.../tez/mapreduce/examples/UnionExample.java | 21 ++-----
.../tez/mapreduce/examples/WordCount.java | 9 +--
17 files changed, 117 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 597a95d..9206e72 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -21,11 +21,9 @@ package org.apache.tez.client;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.TezConfiguration;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index a82393f..d05dd7e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -580,7 +580,7 @@ public class TezClientUtils {
amConfig.getTezConfiguration().set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH,
binaryPath.toUri().toString());
- DAGPlan dagPB = dag.createDag(null);
+ DAGPlan dagPB = dag.createDag(amConfig.getTezConfiguration());
FSDataOutputStream dagPBOutBinaryStream = null;
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
index 87ff828..de82e8a 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
@@ -23,8 +23,6 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
@@ -40,8 +38,6 @@ public class TezYARNUtils {
private static Pattern ENV_VARIABLE_PATTERN = Pattern.compile(Shell.getEnvironmentVariableRegex());
- private static Log LOG = LogFactory.getLog(TezYARNUtils.class);
-
public static String getFrameworkClasspath(Configuration conf, boolean usingArchive) {
Map<String, String> environment = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index e754990..b7ff1fb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -531,7 +531,14 @@ public class DAG {
}
for (Vertex vertex : vertices.values()) {
- // infer credentials and parallelism from data source
+ // infer credentials, resources and parallelism from data source
+ if (vertex.getTaskResource() == null) {
+ vertex.setTaskResource(Resource.newInstance(dagConf.getInt(
+ TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
+ TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+ TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT)));
+ }
List<DataSourceDescriptor> dataSources = vertex.getDataSources();
for (DataSourceDescriptor dataSource : dataSources) {
if (dataSource.getCredentials() != null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ce46bf3..29aae5b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -206,11 +206,20 @@ public class TezConfiguration extends Configuration {
/** The amount of memory to be used by the AppMaster */
public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
+ "resource.memory.mb";
- public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1536;
+ public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1024;
public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
+ "resource.cpu.vcores";
public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
+
+ /** The amount of memory to be used by the AppMaster */
+ public static final String TEZ_TASK_RESOURCE_MEMORY_MB = TEZ_TASK_PREFIX
+ + "resource.memory.mb";
+ public static final int TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT = 1024;
+
+ public static final String TEZ_TASK_RESOURCE_CPU_VCORES = TEZ_TASK_PREFIX
+ + "resource.cpu.vcores";
+ public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1;
public static final String
TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 5f67ad1..654d4a1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -41,7 +41,7 @@ public class Vertex {
private int parallelism;
private VertexLocationHint locationHint;
- private final Resource taskResource;
+ private Resource taskResource;
private Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
private Map<String, String> taskEnvironment = new HashMap<String, String>();
private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs
@@ -74,7 +74,7 @@ public class Vertex {
* reconfigurations.
* @param taskResource
* Physical resources like memory/cpu thats used by each task of this
- * vertex
+ * vertex.
*/
public Vertex(String vertexName,
ProcessorDescriptor processorDescriptor,
@@ -89,9 +89,51 @@ public class Vertex {
"Parallelism should be -1 if determined by the AM"
+ ", otherwise should be >= 0");
}
- if (taskResource == null) {
- throw new IllegalArgumentException("Resource cannot be null");
- }
+ }
+
+ /**
+ * Create a new vertex with the given name and parallelism. <br>
+ * The vertex task resource will be picked from configuration
+ * {@link TezConfiguration#TEZ_TASK_RESOURCE_MEMORY_MB} &
+ * {@link TezConfiguration#TEZ_TASK_RESOURCE_CPU_VCORES} Applications that
+ * want more control over their task resource specification may create their
+ * own logic to determine task resources and use
+ * {@link Vertex#Vertex(String, ProcessorDescriptor, int, Resource)} to create
+ * the Vertex.
+ *
+ * @param vertexName
+ * Name of the vertex
+ * @param processorDescriptor
+ * Description of the processor that is executed in every task of
+ * this vertex
+ * @param parallelism
+ * Number of tasks in this vertex. Set to -1 if this is going to be
+ * decided at runtime. Parallelism may change at runtime due to graph
+ * reconfigurations.
+ */
+ public Vertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism) {
+ this(vertexName, processorDescriptor, parallelism, null);
+ }
+
+ /**
+ * Create a new vertex with the given name. <br>
+ * The vertex task resource will be picked from configuration <br>
+ * The vertex parallelism will be inferred. If it cannot be inferred then an
+ * error will be reported. This constructor may be used for vertices that have
+ * data sources, or connected via 1-1 edges or have runtime parallelism
+ * estimation via data source initializers or vertex managers. Calling this
+ * constructor is equivalent to calling
+ * {@link Vertex#Vertex(String, ProcessorDescriptor, int)} with the
+ * parallelism set to -1.
+ *
+ * @param vertexName
+ * Name of the vertex
+ * @param processorDescriptor
+ * Description of the processor that is executed in every task of
+ * this vertex
+ */
+ public Vertex(String vertexName, ProcessorDescriptor processorDescriptor) {
+ this(vertexName, processorDescriptor, -1);
}
/**
@@ -120,6 +162,10 @@ public class Vertex {
return parallelism;
}
+ /**
+ * Set the number of tasks for this vertex
+ * @param parallelism Parallelism for this vertex
+ */
void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
@@ -345,6 +391,14 @@ public class Vertex {
return Collections.unmodifiableList(outputVertices);
}
+ /**
+ * Set the cpu/memory etc resources used by tasks of this vertex
+ * @param resource {@link Resource} for the tasks of this vertex
+ */
+ void setTaskResource(Resource resource) {
+ this.taskResource = resource;
+ }
+
List<DataSourceDescriptor> getDataSources() {
return dataSources;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
index d0fd83b..217d0b7 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java
@@ -43,7 +43,6 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.examples.WordCount.TokenProcessor;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -118,17 +117,15 @@ public class OrderedWordCount extends Configured implements Tool {
TextOutputFormat.class, outputPath).create();
Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
- TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
+ TokenProcessor.class.getName()));
tokenizerVertex.addDataSource("MRInput", dataSource);
- Vertex summationVertex = new Vertex("Summation",
- new ProcessorDescriptor(
- SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf));
+ Vertex summationVertex = new Vertex("Summation", new ProcessorDescriptor(
+ SumProcessor.class.getName()), numPartitions);
// 1 task for global sorted order
- Vertex sorterVertex = new Vertex("Sorter",
- new ProcessorDescriptor(
- NoOpSorter.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
+ Vertex sorterVertex = new Vertex("Sorter", new ProcessorDescriptor(
+ NoOpSorter.class.getName()), 1);
sorterVertex.addDataSink("MROutput", dataSink);
OrderedPartitionedKVEdgeConfigurer summationEdgeConf = OrderedPartitionedKVEdgeConfigurer
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 91ec179..74b2cb9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -45,7 +44,6 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -127,8 +125,6 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
- JobConf mrConf = new JobConf(tezConf);
-
int numBroadcastTasks = 2;
int numOneToOneTasks = 3;
if (doLocalityCheck) {
@@ -148,17 +144,14 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
Vertex broadcastVertex = new Vertex("Broadcast", new ProcessorDescriptor(
- InputProcessor.class.getName()),
- numBroadcastTasks, MRHelpers.getMapResource(mrConf));
+ InputProcessor.class.getName()), numBroadcastTasks);
Vertex inputVertex = new Vertex("Input", new ProcessorDescriptor(
- InputProcessor.class.getName()).setUserPayload(procPayload),
- numOneToOneTasks, MRHelpers.getMapResource(mrConf));
+ InputProcessor.class.getName()).setUserPayload(procPayload), numOneToOneTasks);
Vertex oneToOneVertex = new Vertex("OneToOne",
new ProcessorDescriptor(
- OneToOneProcessor.class.getName()).setUserPayload(procPayload),
- -1, MRHelpers.getReduceResource(mrConf));
+ OneToOneProcessor.class.getName()).setUserPayload(procPayload));
oneToOneVertex.setVertexManagerPlugin(
new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 560fd55..817fff5 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
-import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -90,7 +89,6 @@ public class FilterLinesByWord extends Configured implements Tool {
public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
- private TezCounters counters = null;
private boolean exitOnCompletion = false;
public FilterLinesByWord(boolean exitOnCompletion) {
@@ -188,7 +186,7 @@ public class FilterLinesByWord extends Configured implements Tool {
int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
- stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+ stage1NumTasks);
if (generateSplitsInClient) {
stage1Vertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
@@ -212,8 +210,7 @@ public class FilterLinesByWord extends Configured implements Tool {
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
- .createUserPayloadFromConf(stage2Conf)), 1,
- MRHelpers.getReduceResource(stage2Conf));
+ .createUserPayloadFromConf(stage2Conf)), 1);
stage2Vertex.setTaskLocalFiles(commonLocalResources);
// Configure the Output for stage2
@@ -268,7 +265,6 @@ public class FilterLinesByWord extends Configured implements Tool {
}
dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
- counters = dagStatus.getDAGCounters();
} finally {
fs.delete(stagingDir, true);
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 09c55e8..43309b1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -174,7 +174,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
- stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+ stage1NumTasks);
if (generateSplitsInClient) {
stage1Vertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
@@ -198,8 +198,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
- .createUserPayloadFromConf(stage2Conf)), stage1NumTasks,
- MRHelpers.getMapResource(stage2Conf));
+ .createUserPayloadFromConf(stage2Conf)), stage1NumTasks);
stage2Vertex.setTaskLocalFiles(commonLocalResources);
// Configure the Output for stage2
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index d83aa34..3e11870 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -46,7 +46,6 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.TezProcessorContext;
@@ -201,7 +200,7 @@ public class IntersectDataGen extends Configured implements Tool {
Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
- largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
+ largeOutSizePerTask, smallOutSizePerTask)), numTasks);
genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
MROutput.createConfigurer(new Configuration(tezConf),
TextOutputFormat.class, largeOutPath.toUri().toString()).create());
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index cc85324..b226d9c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -46,7 +46,6 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -185,22 +184,21 @@ public class IntersectExample extends Configured implements Tool {
// Change the way resources are setup - no MRHelpers
Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
- ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+ ForwardingProcessor.class.getName())).addDataSource(
"streamfile",
MRInput
.createConfigurer(new Configuration(tezConf), TextInputFormat.class,
streamPath.toUri().toString()).groupSplitsInAM(false).create());
Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
- ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+ ForwardingProcessor.class.getName())).addDataSource(
"hashfile",
MRInput
.createConfigurer(new Configuration(tezConf), TextInputFormat.class,
hashPath.toUri().toString()).groupSplitsInAM(false).create());
Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
- IntersectProcessor.class.getName()), numPartitions,
- MRHelpers.getReduceResource(tezConf)).addDataSink("finalOutput",
+ IntersectProcessor.class.getName()), numPartitions).addDataSink("finalOutput",
MROutput.createConfigurer(new Configuration(tezConf),
TextOutputFormat.class, outPath.toUri().toString()).create());
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 9e076ca..5dd7d99 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -44,7 +44,6 @@ import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.Reader;
@@ -189,26 +188,20 @@ public class IntersectValidate extends Configured implements Tool {
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName()).build();
- // Change the way resources are setup - no MRHelpers
Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
- ForwardingProcessor.class.getName()), -1,
- MRHelpers.getMapResource(tezConf)).addDataSource(
- "lhs",
+ ForwardingProcessor.class.getName())).addDataSource("lhs",
MRInput
- .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
- lhs.toUri().toString()).groupSplitsInAM(false).create());
+ .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+ lhs.toUri().toString()).groupSplitsInAM(false).create());
Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
- ForwardingProcessor.class.getName()), -1,
- MRHelpers.getMapResource(tezConf)).addDataSource(
- "rhs",
+ ForwardingProcessor.class.getName())).addDataSource("rhs",
MRInput
- .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
- rhs.toUri().toString()).groupSplitsInAM(false).create());
+ .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+ rhs.toUri().toString()).groupSplitsInAM(false).create());
- Vertex intersectValidateVertex = new Vertex("intersectvalidate",
- new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
- numPartitions, MRHelpers.getReduceResource(tezConf));
+ Vertex intersectValidateVertex = new Vertex("intersectvalidate", new ProcessorDescriptor(
+ IntersectValidateProcessor.class.getName()), numPartitions);
Edge e1 = new Edge(lhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
Edge e2 = new Edge(rhsVertex, intersectValidateVertex, edgeConf.createDefaultEdgeProperty());
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 23acbe4..1596461 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -541,8 +541,7 @@ public class MRRSleepJob extends Configured implements Tool {
int numTasks = generateSplitsInAM ? -1 : numMapper;
Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
- MapProcessor.class.getName()).setUserPayload(mapUserPayload),
- numTasks, MRHelpers.getMapResource(mapStageConf));
+ MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks);
if (writeSplitsToDFS) {
mapVertex.setLocationHint(new VertexLocationHint(taskLocHint));
@@ -566,8 +565,7 @@ public class MRRSleepJob extends Configured implements Tool {
byte[] iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
Vertex ivertex = new Vertex("ireduce" + (i+1),
new ProcessorDescriptor(ReduceProcessor.class.getName()).
- setUserPayload(iReduceUserPayload), numIReducer,
- MRHelpers.getReduceResource(iconf));
+ setUserPayload(iReduceUserPayload), numIReducer);
ivertex.setTaskLocalFiles(commonLocalResources);
vertices.add(ivertex);
}
@@ -577,8 +575,7 @@ public class MRRSleepJob extends Configured implements Tool {
if (numReducer > 0) {
byte[] reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
- ReduceProcessor.class.getName()).setUserPayload(reducePayload),
- numReducer, MRHelpers.getReduceResource(finalReduceConf));
+ ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
finalReduceVertex.setTaskLocalFiles(commonLocalResources);
MRHelpers.addMROutputLegacy(finalReduceVertex, reducePayload);
vertices.add(finalReduceVertex);
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index b243186..ccf55af 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -210,8 +210,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
int numMaps = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(mapPayload)
- .setHistoryText(mapStageHistoryText),
- numMaps, MRHelpers.getMapResource(mapStageConf));
+ .setHistoryText(mapStageHistoryText), numMaps);
if (generateSplitsInClient) {
mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
Map<String, LocalResource> mapLocalResources =
@@ -237,8 +236,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
ReduceProcessor.class.getName())
.setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
- .setHistoryText(iReduceStageHistoryText),
- 2, MRHelpers.getReduceResource(iReduceStageConf));
+ .setHistoryText(iReduceStageHistoryText), 2);
ivertex.setTaskLocalFiles(commonLocalResources);
vertices.add(ivertex);
@@ -250,8 +248,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
new ProcessorDescriptor(
ReduceProcessor.class.getName())
.setUserPayload(finalReducePayload)
- .setHistoryText(finalReduceStageHistoryText), 1,
- MRHelpers.getReduceResource(finalReduceConf));
+ .setHistoryText(finalReduceStageHistoryText), 1);
finalReduceVertex.setTaskLocalFiles(commonLocalResources);
MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
vertices.add(finalReduceVertex);
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 77c6656..90d3090 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -49,7 +49,6 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
import org.apache.tez.mapreduce.output.MROutput;
@@ -175,24 +174,16 @@ public class UnionExample {
DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
- TokenProcessor.class.getName()),
- numMaps, MRHelpers.getMapResource(tezConf));
- mapVertex1.addDataSource("MRInput", dataSource);
+ TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
- TokenProcessor.class.getName()),
- numMaps, MRHelpers.getMapResource(tezConf));
- mapVertex2.addDataSource("MRInput", dataSource);
+ TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
- TokenProcessor.class.getName()),
- numMaps, MRHelpers.getMapResource(tezConf));
- mapVertex3.addDataSource("MRInput", dataSource);
-
- Vertex checkerVertex = new Vertex("checker",
- new ProcessorDescriptor(
- UnionProcessor.class.getName()),
- 1, MRHelpers.getReduceResource(tezConf));
+ TokenProcessor.class.getName()), numMaps).addDataSource("MRInput", dataSource);
+
+ Vertex checkerVertex = new Vertex("checker", new ProcessorDescriptor(
+ UnionProcessor.class.getName()), 1);
Configuration outputConf = new Configuration(tezConf);
DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
http://git-wip-us.apache.org/repos/asf/tez/blob/f184b1a0/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 593eeff..d408146 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -39,7 +39,6 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -118,13 +117,11 @@ public class WordCount extends Configured implements Tool {
TextOutputFormat.class, outputPath).create();
Vertex tokenizerVertex = new Vertex("Tokenizer", new ProcessorDescriptor(
- TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
- tokenizerVertex.addDataSource("Input", dataSource);
+ TokenProcessor.class.getName())).addDataSource("Input", dataSource);
Vertex summerVertex = new Vertex("Summer",
- new ProcessorDescriptor(
- SumProcessor.class.getName()), numPartitions, MRHelpers.getReduceResource(tezConf));
- summerVertex.addDataSink("Output", dataSink);
+ new ProcessorDescriptor(SumProcessor.class.getName()), numPartitions)
+ .addDataSink("Output", dataSink);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName(),