You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/07/10 23:47:06 UTC
git commit: TEZ-291. Change MRRSleepJob to use tez apis. (hitesh)
Updated Branches:
refs/heads/master 999d16ae1 -> b239ebd78
TEZ-291. Change MRRSleepJob to use tez apis. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b239ebd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b239ebd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b239ebd7
Branch: refs/heads/master
Commit: b239ebd7869f563f65b4c9a581c2b7667a3cf767
Parents: 999d16a
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Jul 10 14:46:34 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Jul 10 14:46:34 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/tez/client/TezClient.java | 53 ++--
.../apache/tez/dag/api/client/DAGStatus.java | 24 +-
.../tez/mapreduce/examples/MRRSleepJob.java | 285 ++++++++++++++++++-
.../tez/mapreduce/ResourceMgrDelegate.java | 8 +-
4 files changed, 328 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index 0147cbb..0569afa 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -79,14 +79,14 @@ public class TezClient {
final public static FsPermission TEZ_AM_DIR_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
- final public static FsPermission TEZ_AM_FILE_PERMISSION =
+ final public static FsPermission TEZ_AM_FILE_PERMISSION =
FsPermission.createImmutable((short) 0644); // rw-r--r--
-
+
public static final int UTF8_CHUNK_SIZE = 16 * 1024;
-
+
private final TezConfiguration conf;
private YarnClient yarnClient;
-
+
/**
* <p>
* Create an instance of the TezClient which will be used to communicate with
@@ -96,7 +96,7 @@ public class TezClient {
* Separate instances of TezClient should be created to communicate with
* different instances of YARN
* </p>
- *
+ *
* @param conf
* the configuration which will be used to establish which YARN or
* Tez service instance this client is associated with.
@@ -107,12 +107,12 @@ public class TezClient {
yarnClient.init(new YarnConfiguration(conf));
yarnClient.start();
}
-
+
/**
* Submit a Tez DAG to YARN as an application. The job will be submitted to
* the yarn cluster or tez service which was specified when creating this
* {@link TezClient} instance.
- *
+ *
* @param dag
* <code>DAG</code> to be submitted
* @param appStagingDir
@@ -149,7 +149,7 @@ public class TezClient {
* Submit a Tez DAG to YARN with known <code>ApplicationId</code>. This is a
* private method and is only meant to be used within Tez for MR client
* backward compatibility.
- *
+ *
* @param appId
* - <code>ApplicationId</code> to be used
* @param dag
@@ -192,7 +192,7 @@ public class TezClient {
return getDAGClient(appId);
}
-
+
/**
* Create a new YARN application
* @return <code>ApplicationId</code> for the new YARN application
@@ -209,7 +209,7 @@ public class TezClient {
}
@Private
- public DAGClient getDAGClient(ApplicationId appId)
+ public DAGClient getDAGClient(ApplicationId appId)
throws IOException, TezException {
return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), conf);
}
@@ -224,7 +224,7 @@ public class TezClient {
vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0);
vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
}
-
+
public FileSystem ensureExists(Path stagingArea)
throws IOException {
FileSystem fs = stagingArea.getFileSystem(conf);
@@ -254,7 +254,7 @@ public class TezClient {
}
return fs;
}
-
+
private LocalResource createApplicationResource(FileSystem fs, Path p,
LocalResourceType type) throws IOException {
LocalResource rsrc = Records.newRecord(LocalResource.class);
@@ -364,12 +364,12 @@ public class TezClient {
return conf;
}
}
-
+
private ApplicationSubmissionContext createApplicationSubmissionContext(
ApplicationId appId, DAG dag, Path appStagingDir, Credentials ts,
String amQueueName, String amName, List<String> amArgs,
Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
- TezConfiguration amConf) throws IOException, YarnException {
+ TezConfiguration amConf) throws IOException, YarnException {
if (amConf == null) {
amConf = new TezConfiguration();
@@ -403,8 +403,10 @@ public class TezClient {
String amLogLevel = conf.get(TezConfiguration.TEZ_AM_LOG_LEVEL,
TezConfiguration.DEFAULT_TEZ_AM_LOG_LEVEL);
addLog4jSystemProperties(amLogLevel, vargs);
-
- vargs.addAll(amArgs);
+
+ if (amArgs != null) {
+ vargs.addAll(amArgs);
+ }
vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -453,14 +455,19 @@ public class TezClient {
Environment.CLASSPATH.name(),
Environment.PWD.$() + File.separator + "*");
- for (Map.Entry<String, String> entry : amEnv.entrySet()) {
- Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+ if (amEnv != null) {
+ for (Map.Entry<String, String> entry : amEnv.entrySet()) {
+ Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+ }
}
Map<String, LocalResource> localResources =
new TreeMap<String, LocalResource>();
- localResources.putAll(amLocalResources);
+ if (amLocalResources != null) {
+ localResources.putAll(amLocalResources);
+ }
+
Map<String, LocalResource> tezJarResources =
setupTezJarsLocalResources();
localResources.putAll(tezJarResources);
@@ -469,7 +476,7 @@ public class TezClient {
for (Vertex v : dag.getVertices()) {
v.getTaskLocalResources().putAll(tezJarResources);
}
-
+
// emit protobuf DAG file style
Path binaryPath = new Path(appStagingDir,
TezConfiguration.TEZ_AM_PLAN_PB_BINARY + "." + appId.toString());
@@ -481,7 +488,7 @@ public class TezClient {
DAGPlan dagPB = dag.createDag(finalAMConf);
FSDataOutputStream dagPBOutBinaryStream = null;
-
+
try {
//binary output
dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
@@ -502,7 +509,7 @@ public class TezClient {
localResources.put(TezConfiguration.TEZ_AM_PLAN_PB_TEXT,
createApplicationResource(fs, textPath, LocalResourceType.FILE));
}
-
+
Map<ApplicationAccessType, String> acls
= new HashMap<ApplicationAccessType, String>();
@@ -567,7 +574,7 @@ public class TezClient {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(6);
}
-
+
String getDefaultTezDAGID(ApplicationId appId) {
return (new StringBuilder(DAG)).append(SEPARATOR).
append(appId.getClusterTimestamp()).
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index c28f2bf..1dbfd96 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -37,15 +37,15 @@ public class DAGStatus {
FAILED,
ERROR,
};
-
+
DAGStatusProtoOrBuilder proxy = null;
Progress progress = null;
Map<String, Progress> vertexProgress = null;
-
+
public DAGStatus(DAGStatusProtoOrBuilder proxy) {
this.proxy = proxy;
}
-
+
public State getState() {
switch(proxy.getState()) {
case DAG_SUBMITTED:
@@ -64,11 +64,11 @@ public class DAGStatus {
case DAG_ERROR:
return DAGStatus.State.ERROR;
default:
- throw new TezUncheckedException("Unsupported value for DAGStatus.State : " +
+ throw new TezUncheckedException("Unsupported value for DAGStatus.State : " +
proxy.getState());
}
}
-
+
public boolean isCompleted() {
State state = getState();
return (state == State.SUCCEEDED ||
@@ -83,7 +83,7 @@ public class DAGStatus {
/**
* Gets overall progress value of the DAG.
- *
+ *
* @return Progress of the DAG. Maybe null when the DAG is not running. Maybe
* null when the DAG is running and the application master cannot be
* reached - e.g. when the execution platform has restarted the
@@ -99,7 +99,7 @@ public class DAGStatus {
/**
* Get the progress of a vertex in the DAG
- *
+ *
* @return Progress of the vertex. May be null when the DAG is not running.
* Maybe null when the DAG is running and the application master
* cannot be reached - e.g. when the execution platform has restarted
@@ -110,7 +110,7 @@ public class DAGStatus {
if(vertexProgress == null) {
if(proxy.getVertexProgressList() != null) {
List<StringProgressPairProto> kvList = proxy.getVertexProgressList();
- vertexProgress = new HashMap<String, Progress>(kvList.size());
+ vertexProgress = new HashMap<String, Progress>(kvList.size());
for(StringProgressPairProto kv : kvList){
vertexProgress.put(kv.getKey(), new Progress(kv.getProgress()));
}
@@ -119,4 +119,12 @@ public class DAGStatus {
return vertexProgress;
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("status=" + getState()
+ + ", progress=" + getDAGProgress());
+ return sb.toString();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 627d583..1cb4cd1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -22,16 +22,21 @@ import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -43,11 +48,42 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* Dummy class for testing MR framefork. Sleeps for a defined period
@@ -359,11 +395,207 @@ public class MRRSleepJob extends Configured implements Tool {
System.exit(res);
}
+ public DAG createDAG(FileSystem remoteFs, Configuration conf,
+ ApplicationId appId, Path remoteStagingDir,
+ int numMapper, int numReducer, int iReduceStagesCount,
+ int numIReducer, long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount,
+ long iReduceSleepTime, int iReduceSleepCount)
+ throws IOException, YarnException {
+
+
+ Configuration mapStageConf = new JobConf(conf);
+ mapStageConf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+ mapStageConf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+ mapStageConf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+ mapStageConf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
+ mapStageConf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+ mapStageConf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+ mapStageConf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+ mapStageConf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
+ mapStageConf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
+ mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+ mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ SleepInputFormat.class.getName());
+ mapStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ MRRSleepJobPartitioner.class.getName());
+
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
+ null);
+
+ Configuration[] intermediateReduceStageConfs = null;
+ if (iReduceStagesCount > 0
+ && numIReducer > 0) {
+ intermediateReduceStageConfs = new JobConf[iReduceStagesCount];
+ for (int i = 1; i <= iReduceStagesCount; ++i) {
+ JobConf iReduceStageConf = new JobConf(conf);
+ iReduceStageConf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, iReduceSleepTime);
+ iReduceStageConf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, iReduceSleepCount);
+ iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, numIReducer);
+ iReduceStageConf
+ .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+ iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ iReduceStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ MRRSleepJobPartitioner.class.getName());
+
+ if (i == 1) {
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(
+ iReduceStageConf, mapStageConf);
+ }
+ else {
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(
+ iReduceStageConf, intermediateReduceStageConfs[i-2]);
+ }
+ intermediateReduceStageConfs[i-1] = iReduceStageConf;
+ }
+ }
+
+ Configuration finalReduceConf = null;
+ if (numReducer > 0) {
+ finalReduceConf = new JobConf(conf);
+ finalReduceConf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, reduceSleepTime);
+ finalReduceConf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, reduceSleepCount);
+ finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, numReducer);
+ finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+ finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+ NullOutputFormat.class.getName());
+
+ if (iReduceStagesCount != 0) {
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+ intermediateReduceStageConfs[iReduceStagesCount-1]);
+ } else {
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+ mapStageConf);
+ }
+ }
+
+ MRHelpers.doJobClientMagic(mapStageConf);
+ if (iReduceStagesCount > 0
+ && numIReducer > 0) {
+ for (int i = 0; i < iReduceStagesCount; ++i) {
+ MRHelpers.doJobClientMagic(intermediateReduceStageConfs[i]);
+ }
+ }
+ if (numReducer > 0) {
+ MRHelpers.doJobClientMagic(finalReduceConf);
+ }
+
+ InputSplitInfo inputSplitInfo;
+ try {
+ inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
+ remoteStagingDir);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ throw new TezUncheckedException("Could not generate input splits", e);
+ } catch (ClassNotFoundException e) {
+ throw new TezUncheckedException("Failed to generate input splits", e);
+ }
+
+ DAG dag = new DAG("MRRSleepJob");
+ String jarPath = ClassUtil.findContainingJar(getClass());
+ Path remoteJarPath = remoteFs.makeQualified(
+ new Path(remoteStagingDir, "dag_job.jar"));
+ remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+ FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
+
+ Map<String, LocalResource> commonLocalResources =
+ new HashMap<String, LocalResource>();
+ LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ jarFileStatus.getLen(),
+ jarFileStatus.getModificationTime());
+ commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+ List<Vertex> vertices = new ArrayList<Vertex>();
+
+ Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+ MapProcessor.class.getName(),
+ MRHelpers.createByteBufferFromConf(mapStageConf)),
+ numMapper,
+ MRHelpers.getMapResource(mapStageConf));
+ mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+ mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ Map<String, LocalResource> mapLocalResources =
+ new HashMap<String, LocalResource>();
+ mapLocalResources.putAll(commonLocalResources);
+ MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
+ mapLocalResources);
+ mapVertex.setTaskLocalResources(mapLocalResources);
+ Map<String, String> mapEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+ mapVertex.setTaskEnvironment(mapEnv);
+ vertices.add(mapVertex);
+
+ if (iReduceStagesCount > 0
+ && numIReducer > 0) {
+ for (int i = 0; i < iReduceStagesCount; ++i) {
+ Configuration iconf =
+ intermediateReduceStageConfs[i];
+ Vertex ivertex = new Vertex("ireduce" + (i+1),
+ new ProcessorDescriptor(ReduceProcessor.class.getName(),
+ MRHelpers.createByteBufferFromConf(iconf)),
+ numIReducer,
+ MRHelpers.getReduceResource(iconf));
+ ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts(iconf));
+ ivertex.setTaskLocalResources(commonLocalResources);
+ Map<String, String> reduceEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(iconf, reduceEnv, false);
+ ivertex.setTaskEnvironment(reduceEnv);
+ vertices.add(ivertex);
+ }
+ }
+
+ Vertex finalReduceVertex = null;
+ if (numReducer > 0) {
+ finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
+ ReduceProcessor.class.getName(),
+ MRHelpers.createByteBufferFromConf(finalReduceConf)),
+ numReducer,
+ MRHelpers.getReduceResource(finalReduceConf));
+ finalReduceVertex.setJavaOpts(
+ MRHelpers.getReduceJavaOpts(finalReduceConf));
+ finalReduceVertex.setTaskLocalResources(commonLocalResources);
+ Map<String, String> reduceEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
+ finalReduceVertex.setTaskEnvironment(reduceEnv);
+ vertices.add(finalReduceVertex);
+ }
+
+ for (int i = 0; i < vertices.size(); ++i) {
+ dag.addVertex(vertices.get(i));
+ if (i != 0) {
+ dag.addEdge(new Edge(vertices.get(i-1),
+ vertices.get(i), new EdgeProperty(
+ ConnectionPattern.BIPARTITE, SourceType.STABLE,
+ new OutputDescriptor(
+ OnFileSortedOutput.class.getName(), null),
+ new InputDescriptor(
+ ShuffledMergedInput.class.getName(), null))));
+ }
+ }
+
+ return dag;
+ }
+
+ @VisibleForTesting
public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
int numIReducer, long mapSleepTime, int mapSleepCount,
long reduceSleepTime, int reduceSleepCount,
long iReduceSleepTime, int iReduceSleepCount)
- throws IOException {
+ throws IOException {
Configuration conf = getConf();
conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
@@ -385,20 +617,19 @@ public class MRRSleepJob extends Configured implements Tool {
// Set reducer class for intermediate reduce
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
- "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
+ "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
// Set reducer output key class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
- "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
+ "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
// Set reducer output value class
conf.setClass(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
- "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
+ "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
conf.setInt(
MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
- "mapreduce.job.reduces"), numIReducer);
+ "mapreduce.job.reduces"), numIReducer);
}
-
Job job = Job.getInstance(conf, "sleep");
job.setNumReduceTasks(numReducer);
job.setJarByClass(MRRSleepJob.class);
@@ -413,7 +644,6 @@ public class MRRSleepJob extends Configured implements Tool {
job.setSpeculativeExecution(false);
job.setJobName("Sleep job");
-
FileInputFormat.addInputPath(job, new Path("ignored"));
return job;
}
@@ -468,10 +698,45 @@ public class MRRSleepJob extends Configured implements Tool {
mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
iReduceSleepCount = (int)Math.ceil(iReduceSleepTime / ((double)recSleepTime));
- Job job = createJob(numMapper, numReducer, iReduceStagesCount, numIReducer,
+
+ TezConfiguration conf = new TezConfiguration();
+ FileSystem remoteFs = FileSystem.get(conf);
+
+ TezClient tezClient = new TezClient(conf);
+ ApplicationId appId =
+ tezClient.createApplication();
+
+ Path remoteStagingDir =
+ remoteFs.makeQualified(new Path(conf.get(
+ TezConfiguration.TEZ_AM_STAGING_DIR,
+ TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
+ appId.toString()));
+ tezClient.ensureExists(remoteStagingDir);
+
+ DAG dag = createDAG(remoteFs, conf, appId, remoteStagingDir,
+ numMapper, numReducer, iReduceStagesCount, numIReducer,
mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
iReduceSleepTime, iReduceSleepCount);
- return job.waitForCompletion(true) ? 0 : 1;
+
+ DAGClient dagClient =
+ tezClient.submitDAGApplication(appId, dag, remoteStagingDir,
+ null, null, null, null, null, conf);
+
+ while (true) {
+ DAGStatus status = dagClient.getDAGStatus();
+ LOG.info("DAG Status: " + status);
+ if (status.isCompleted()) {
+ break;
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ }
+
+ return dagClient.getApplicationReport().getFinalApplicationStatus() ==
+ FinalApplicationStatus.SUCCEEDED ? 0 : 1;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
index 3bf9958..0e767b4 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
@@ -20,6 +20,8 @@ package org.apache.tez.mapreduce;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
public class ResourceMgrDelegate {
@@ -81,7 +84,10 @@ public class ResourceMgrDelegate {
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
try {
- return TypeConverter.fromYarnApps(client.getApplicationList(), this.conf);
+ Set<String> appTypes = new HashSet<String>(1);
+ appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
+ return TypeConverter.fromYarnApps(client.getApplications(appTypes),
+ this.conf);
} catch (YarnException e) {
throw new IOException(e);
}