You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:29:54 UTC
svn commit: r1470654 - in /incubator/tez/branches/TEZ-1:
tez-dag-api/src/main/java/org/apache/tez/dag/api/
tez-dag/src/main/java/org/apache/hadoop/mapred/
tez-dag/src/main/java/org/apache/tez/dag/api/impl/
tez-dag/src/main/java/org/apache/tez/dag/app/ ...
Author: sseth
Date: Mon Apr 22 18:29:54 2013
New Revision: 1470654
URL: http://svn.apache.org/r1470654
Log:
TEZ-55. Remove MR references rom DAGAppMaster. (sseth)
Added:
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
Modified:
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java Mon Apr 22 18:29:54 2013
@@ -58,7 +58,7 @@ public class DAG { // FIXME rename to To
edges.add(edge);
}
-
+
public void verify() throws TezException { // FIXME better exception
//FIXME are task resources compulsory or will the DAG AM put in a default
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java Mon Apr 22 18:29:54 2013
@@ -60,6 +60,8 @@ public class DAGConfiguration extends Co
}
public final static String DAG = "tez.dag.";
+
+ public final static String DAG_AM = DAG + "am.";
public final static String VERTEX = DAG + "vertex.";
@@ -75,6 +77,23 @@ public class DAGConfiguration extends Co
private final static String SEPARATOR = "|";
+ // FIXME This property onwards should be split into a separate class or the
+ // rest of thie class needs to be converted into a config name list once
+ // the serialization is changed.
+
+ // TODO Should not be required once all tokens are handled via AppSubmissionContext
+ public static final String JOB_SUBMIT_DIR = DAG + "jobSubmitDir";
+ public static final String APPLICATION_TOKENS_FILE = "appTokens";
+
+ public static final String JOB_NAME = DAG + "job.name";
+ public static final String USER_NAME = DAG + "user.name";
+
+ // TODO Speculator class should be configurable on a pere vertex level.
+ public static final String DAG_AM_SPECULATOR_CLASS = DAG_AM + "speculator.class";
+
+ public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = DAG_AM + "task.listener.thread-count";
+ public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
+
@Private
public void setEdgeProperties(List<Edge> edges) {
String[] edgeIds = new String[edges.size()];
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java Mon Apr 22 18:29:54 2013
@@ -107,6 +107,10 @@ public class MRVertexOutputCommitter ext
@Override
public void init(VertexContext context) throws IOException {
+ // TODO VertexContext not the best way to get ApplicationAttemptId. No
+ // alternates rightnow.
+ context.getConf().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ context.getApplicationAttemptId().getAttemptId());
committer = getOutputCommitter(context);
jobContext = getJobContextFromVertexContext(context);
initialized = true;
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java Mon Apr 22 18:29:54 2013
@@ -19,6 +19,7 @@
package org.apache.tez.dag.api.impl;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.tez.engine.records.TezDAGID;
import org.apache.tez.engine.records.TezVertexID;
@@ -27,6 +28,9 @@ public interface VertexContext {
public Configuration getConf();
public TezDAGID getDAGId();
+
+ // TODO Get rid of this as part of VertexContext cleanup.
+ public ApplicationAttemptId getApplicationAttemptId();
public TezVertexID getVertexId();
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java Mon Apr 22 18:29:54 2013
@@ -23,9 +23,7 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -38,7 +36,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@@ -55,9 +53,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -66,14 +61,10 @@ import org.apache.hadoop.yarn.event.Even
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.DAGConfiguration;
import org.apache.tez.dag.api.DAGLocationHint;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.app.client.ClientService;
import org.apache.tez.dag.app.client.impl.TezClientService;
import org.apache.tez.dag.app.dag.DAG;
@@ -104,7 +95,6 @@ import org.apache.tez.dag.app.rm.RMConta
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
@@ -116,7 +106,6 @@ import org.apache.tez.dag.app.taskclean.
import org.apache.tez.dag.app.taskclean.TaskCleanerImpl;
import org.apache.tez.engine.common.security.JobTokenSecretManager;
import org.apache.tez.engine.records.TezDAGID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
/**
* The Map-Reduce Application Master.
@@ -227,9 +216,7 @@ public class DAGAppMaster extends Compos
// Job name is the same as the app name util we support DAG of jobs
// for an app later
- appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
-
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+ appName = conf.get(DAGConfiguration.JOB_NAME, "<missing app name>");
dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
@@ -664,10 +651,10 @@ public class DAGAppMaster extends Compos
// Read the file-system tokens from the localized tokens-file.
Path jobSubmitDir =
FileContext.getLocalFSFileContext().makeQualified(
- new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
+ new Path(new File(DAGConfiguration.JOB_SUBMIT_DIR)
.getAbsolutePath()));
Path jobTokenFile =
- new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+ new Path(jobSubmitDir, DAGConfiguration.APPLICATION_TOKENS_FILE);
fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ jobTokenFile);
@@ -727,7 +714,7 @@ public class DAGAppMaster extends Compos
try {
speculatorClass
// "yarn.mapreduce.job.speculator.class"
- = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+ = conf.getClass(DAGConfiguration.DAG_AM_SPECULATOR_CLASS,
DefaultSpeculator.class,
Speculator.class);
Constructor<? extends Speculator> speculatorConstructor
@@ -738,19 +725,19 @@ public class DAGAppMaster extends Compos
return result;
} catch (InstantiationException ex) {
LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
throw new YarnException(ex);
} catch (IllegalAccessException ex) {
LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
throw new YarnException(ex);
} catch (InvocationTargetException ex) {
LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
throw new YarnException(ex);
} catch (NoSuchMethodException ex) {
LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
throw new YarnException(ex);
}
}
@@ -765,16 +752,16 @@ public class DAGAppMaster extends Compos
protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
Configuration conf) {
TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
- MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
- MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+ DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
+ DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
return thh;
}
protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
Configuration conf) {
ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
- MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
- MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+ DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
+ DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
// TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
return chh;
}
@@ -1027,7 +1014,7 @@ public class DAGAppMaster extends Compos
@Override
public String getUser() {
- return this.conf.get(MRJobConfig.USER_NAME);
+ return this.conf.get(DAGConfiguration.USER_NAME);
}
@Override
@@ -1294,9 +1281,9 @@ public class DAGAppMaster extends Compos
LOG.info("Running job type: " + type);
if (type.equals("mr")) {
- dagConf = (DAGConfiguration)createDAGConfigurationForMR();
+ dagConf = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMR();
} else if (type.equals("mrr")) {
- dagConf = (DAGConfiguration)createDAGConfigurationForMRR();
+ dagConf = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMRR();
}
} else {
dagConf = new DAGConfiguration();
@@ -1317,7 +1304,8 @@ public class DAGAppMaster extends Compos
// the objects myself.
dagConf.setBoolean("fs.automatic.close", false);
- // TODO TEZ HACK - user name in DAGConfiguration
+ dagConf.set(DAGConfiguration.USER_NAME, jobUserName);
+ // TODO Remove after fixing TaskLanch JVM construction
dagConf.set(MRJobConfig.USER_NAME, jobUserName);
initAndStartAppMaster(appMaster, new YarnConfiguration(dagConf),
@@ -1338,163 +1326,7 @@ public class DAGAppMaster extends Compos
return opts;
}
- //TODO remove once client is in place
- private static Path getMRBaseDir() throws IOException {
- Path basePath = MRApps.getStagingAreaDir(new Configuration(),
- UserGroupInformation.getCurrentUser().getShortUserName());
- return new Path(basePath, "dagTest");
- }
-
- private static Path getMRRBaseDir() throws IOException {
- Path basePath = MRApps.getStagingAreaDir(new Configuration(),
- UserGroupInformation.getCurrentUser().getShortUserName());
- return new Path(basePath, "mrrTest");
- }
-
- private static String getConfFileName(String vertexName) {
- return MRJobConfig.JOB_CONF_FILE + "_" + vertexName;
- }
-
- // TODO remove once client is in place
- private static Map<String, LocalResource> createLocalResources(
- Path remoteBaseDir, String[] resourceNames) throws IOException {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
-
- Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
-
- for (String resourceName : resourceNames) {
- Path remoteFile = new Path(remoteBaseDir, resourceName);
- localResources.put(resourceName, AMContainerHelpers.createLocalResource(
- fs, remoteFile, LocalResourceType.FILE,
- LocalResourceVisibility.APPLICATION));
- LOG.info("Localizing file " + resourceName + " from location "
- + remoteFile.toString());
- }
- return localResources;
- }
-
-
- private static String[] getMRLocalRsrcList() {
- String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
- MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
- MRJobConfig.JOB_CONF_FILE };
- return resourceNames;
- }
-
- private static String[] getMRRLocalRsrcList() {
- String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
- MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
- MRJobConfig.JOB_CONF_FILE, getConfFileName("reduce1"),
- getConfFileName("reduce2") };
- return resourceNames;
- }
-
- private static Configuration createDAGConfigurationForMRR() throws IOException {
- org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
- Vertex mapVertex = new Vertex("map",
- "org.apache.tez.mapreduce.task.InitialTask", 6);
- Vertex reduce1Vertex = new Vertex("reduce1",
- "org.apache.tez.mapreduce.task.IntermediateTask", 3);
- Vertex reduce2Vertex = new Vertex("reduce2",
- "org.apache.tez.mapreduce.task.FinalTask", 3);
- Edge edge1 = new Edge(mapVertex, reduce1Vertex, new EdgeProperty());
- Edge edge2 = new Edge(reduce1Vertex, reduce2Vertex, new EdgeProperty());
- Map<String, LocalResource> jobRsrcs = createLocalResources(getMRRBaseDir(),
- getMRRLocalRsrcList());
-
- Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
- Map<String, LocalResource> reduce1Rsrcs = new HashMap<String, LocalResource>();
- Map<String, LocalResource> reduce2Rsrcs = new HashMap<String, LocalResource>();
-
- mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
- mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
- mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
- mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
- mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-
- reduce1Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
- reduce1Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
- reduce1Rsrcs.put(getConfFileName("reduce1"), jobRsrcs.get(getConfFileName("reduce1")));
-
- reduce2Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
- reduce2Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
- reduce2Rsrcs.put(getConfFileName("reduce2"), jobRsrcs.get(getConfFileName("reduce2")));
-
- Resource mapResource = BuilderUtils.newResource(
- MRJobConfig.DEFAULT_MAP_MEMORY_MB,
- MRJobConfig.DEFAULT_MAP_CPU_VCORES);
- mapVertex.setTaskResource(mapResource);
- mapVertex.setTaskLocalResources(mapRsrcs);
- Resource reduceResource = BuilderUtils.newResource(
- MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
- MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
- reduce1Vertex.setTaskResource(reduceResource);
- reduce1Vertex.setTaskLocalResources(reduce1Rsrcs);
-
- reduce1Vertex.setTaskResource(reduceResource);
- reduce2Vertex.setTaskLocalResources(reduce2Rsrcs);
-
- dag.addVertex(mapVertex);
- dag.addVertex(reduce1Vertex);
- dag.addVertex(reduce2Vertex);
- dag.addEdge(edge1);
- dag.addEdge(edge2);
- dag.verify();
- DAGConfiguration dagConf = dag.serializeDag();
-
- dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
- dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
-
- return dagConf;
- }
-
- // TODO remove once client is in place
- private static Configuration createDAGConfigurationForMR() throws IOException {
- org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
- Vertex mapVertex = new Vertex("map",
- "org.apache.tez.mapreduce.task.InitialTask", 6);
- Vertex reduceVertex = new Vertex("reduce",
- "org.apache.tez.mapreduce.task.FinalTask", 1);
- Edge edge = new Edge(mapVertex, reduceVertex, new EdgeProperty());
-
- Map<String, LocalResource> jobRsrcs = createLocalResources(getMRBaseDir(),
- getMRLocalRsrcList());
-
- Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
- Map<String, LocalResource> reduceRsrcs = new HashMap<String, LocalResource>();
-
- mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
- mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
- mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
- mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
- mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-
- reduceRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
- reduceRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
- reduceRsrcs.put(getConfFileName("reduce"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-
- Resource mapResource = BuilderUtils.newResource(
- MRJobConfig.DEFAULT_MAP_MEMORY_MB,
- MRJobConfig.DEFAULT_MAP_CPU_VCORES);
- mapVertex.setTaskResource(mapResource);
- mapVertex.setTaskLocalResources(mapRsrcs);
- Resource reduceResource = BuilderUtils.newResource(
- MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
- MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
- reduceVertex.setTaskResource(reduceResource);
- reduceVertex.setTaskLocalResources(reduceRsrcs);
- dag.addVertex(mapVertex);
- dag.addVertex(reduceVertex);
- dag.addEdge(edge);
- dag.verify();
- DAGConfiguration dagConf = dag.serializeDag();
-
- dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
- dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
- return dagConf;
- }
// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java?rev=1470654&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java Mon Apr 22 18:29:54 2013
@@ -0,0 +1,189 @@
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class MRRExampleHelper {
+
+ private static final Log LOG = LogFactory.getLog(MRRExampleHelper.class);
+
+ //TODO remove once client is in place
+ private static Path getMRBaseDir() throws IOException {
+ Path basePath = MRApps.getStagingAreaDir(new Configuration(),
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ return new Path(basePath, "dagTest");
+ }
+
+ private static Path getMRRBaseDir() throws IOException {
+ Path basePath = MRApps.getStagingAreaDir(new Configuration(),
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ return new Path(basePath, "mrrTest");
+ }
+
+ private static String getConfFileName(String vertexName) {
+ return MRJobConfig.JOB_CONF_FILE + "_" + vertexName;
+ }
+
+ // TODO remove once client is in place
+ private static Map<String, LocalResource> createLocalResources(
+ Path remoteBaseDir, String[] resourceNames) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
+
+ for (String resourceName : resourceNames) {
+ Path remoteFile = new Path(remoteBaseDir, resourceName);
+ localResources.put(resourceName, AMContainerHelpers.createLocalResource(
+ fs, remoteFile, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION));
+ LOG.info("Localizing file " + resourceName + " from location "
+ + remoteFile.toString());
+ }
+ return localResources;
+ }
+
+
+ private static String[] getMRLocalRsrcList() {
+ String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
+ MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.JOB_CONF_FILE };
+ return resourceNames;
+ }
+
+ private static String[] getMRRLocalRsrcList() {
+ String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
+ MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.JOB_CONF_FILE, getConfFileName("reduce1"),
+ getConfFileName("reduce2") };
+ return resourceNames;
+ }
+
+ static Configuration createDAGConfigurationForMRR() throws IOException {
+ org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+ Vertex mapVertex = new Vertex("map",
+ "org.apache.tez.mapreduce.task.InitialTask", 6);
+ Vertex reduce1Vertex = new Vertex("reduce1",
+ "org.apache.tez.mapreduce.task.IntermediateTask", 3);
+ Vertex reduce2Vertex = new Vertex("reduce2",
+ "org.apache.tez.mapreduce.task.FinalTask", 3);
+ Edge edge1 = new Edge(mapVertex, reduce1Vertex, new EdgeProperty());
+ Edge edge2 = new Edge(reduce1Vertex, reduce2Vertex, new EdgeProperty());
+ Map<String, LocalResource> jobRsrcs = createLocalResources(getMRRBaseDir(),
+ getMRRLocalRsrcList());
+
+ Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> reduce1Rsrcs = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> reduce2Rsrcs = new HashMap<String, LocalResource>();
+
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
+ mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+ reduce1Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ reduce1Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ reduce1Rsrcs.put(getConfFileName("reduce1"), jobRsrcs.get(getConfFileName("reduce1")));
+
+ reduce2Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ reduce2Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ reduce2Rsrcs.put(getConfFileName("reduce2"), jobRsrcs.get(getConfFileName("reduce2")));
+
+ Resource mapResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ mapVertex.setTaskResource(mapResource);
+ mapVertex.setTaskLocalResources(mapRsrcs);
+ Resource reduceResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ reduce1Vertex.setTaskResource(reduceResource);
+ reduce1Vertex.setTaskLocalResources(reduce1Rsrcs);
+
+ reduce1Vertex.setTaskResource(reduceResource);
+ reduce2Vertex.setTaskLocalResources(reduce2Rsrcs);
+
+ dag.addVertex(mapVertex);
+ dag.addVertex(reduce1Vertex);
+ dag.addVertex(reduce2Vertex);
+ dag.addEdge(edge1);
+ dag.addEdge(edge2);
+ dag.verify();
+ DAGConfiguration dagConf = dag.serializeDag();
+
+ dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+ return dagConf;
+ }
+
+ // TODO remove once client is in place
+ static Configuration createDAGConfigurationForMR() throws IOException {
+ org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+ Vertex mapVertex = new Vertex("map",
+ "org.apache.tez.mapreduce.task.InitialTask", 6);
+ Vertex reduceVertex = new Vertex("reduce",
+ "org.apache.tez.mapreduce.task.FinalTask", 1);
+ Edge edge = new Edge(mapVertex, reduceVertex, new EdgeProperty());
+
+ Map<String, LocalResource> jobRsrcs = createLocalResources(getMRBaseDir(),
+ getMRLocalRsrcList());
+
+ Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> reduceRsrcs = new HashMap<String, LocalResource>();
+
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
+ mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
+ mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+ reduceRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+ reduceRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+ reduceRsrcs.put(getConfFileName("reduce"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+ Resource mapResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ mapVertex.setTaskResource(mapResource);
+ mapVertex.setTaskLocalResources(mapRsrcs);
+ Resource reduceResource = BuilderUtils.newResource(
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ reduceVertex.setTaskResource(reduceResource);
+ reduceVertex.setTaskLocalResources(reduceRsrcs);
+ dag.addVertex(mapVertex);
+ dag.addVertex(reduceVertex);
+ dag.addEdge(edge);
+ dag.verify();
+ DAGConfiguration dagConf = dag.serializeDag();
+
+ dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+ return dagConf;
+ }
+
+}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Mon Apr 22 18:29:54 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -1273,6 +1274,11 @@ public class VertexImpl implements org.a
public TezDAGID getDAGId() {
return appContext.getDAGID();
}
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appContext.getApplicationAttemptId();
+ }
public Resource getTaskResource() {
return taskResource;
@@ -1315,4 +1321,5 @@ public class VertexImpl implements org.a
}
return outputSpecList;
}
+
}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml Mon Apr 22 18:29:54 2013
@@ -34,6 +34,10 @@
<artifactId>tez-engine</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Mon Apr 22 18:29:54 2013
@@ -18,13 +18,39 @@
package org.apache.tez.mapreduce.hadoop;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.DAGConfiguration;
+
public class DeprecatedKeys {
+
+ // This could be done via deprecation.
+ private static Map<String, String> mrParamToDAGParamMap = new HashMap<String, String>();
+
+ public static Map<String, String> getMRToDAGParamMap() {
+ return Collections.unmodifiableMap(mrParamToDAGParamMap);
+ }
+
static {
addDeprecatedKeys();
+
+ mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR, DAGConfiguration.JOB_SUBMIT_DIR);
+ mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE, DAGConfiguration.APPLICATION_TOKENS_FILE);
+
+ mrParamToDAGParamMap.put(MRJobConfig.JOB_NAME, DAGConfiguration.JOB_NAME);
+
+ mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_SPECULATOR, DAGConfiguration.DAG_AM_SPECULATOR_CLASS);
+
+ // TODO Default value handling.
+ mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT);
+
+ mrParamToDAGParamMap.put(MRJobConfig.USER_NAME, DAGConfiguration.USER_NAME);
}
// TODO TEZAM4 Sometime, make sure this gets loaded by default. Insteaf of the current initialization in MRAppMaster, TezChild.
@@ -89,7 +115,7 @@ public class DeprecatedKeys {
_(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
}
-
+
private static void _(String oldKey, String newKey) {
Configuration.addDeprecation(oldKey, newKey);
}
Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Mon Apr 22 18:29:54 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,9 +60,9 @@ import org.apache.tez.dag.api.EdgeProper
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.zookeeper.Environment.Entry;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -681,6 +682,17 @@ public class YARNRunner implements Clien
}
}
+ private void setDAGParamsFromMRConf(DAGConfiguration dagConf) {
+ Configuration mrConf = this.conf;
+ Map<String, String> mrParamToDAGParamMap = DeprecatedKeys.getMRToDAGParamMap();
+ for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+ if (mrConf.get(entry.getKey()) != null) {
+ LOG.info("DEBUG: MR->DAG Setting new key: " + entry.getValue());
+ dagConf.set(entry.getValue(), mrConf.get(entry.getKey()));
+ }
+ }
+ }
+
private ApplicationSubmissionContext createApplicationSubmissionContext(
FileSystem fs, DAG dag,
Configuration jobConf, String jobSubmitDir, Credentials ts,
@@ -766,7 +778,8 @@ public class YARNRunner implements Clien
// FIXME add serialized dag conf
DAGConfiguration dagConf = dag.serializeDag();
-
+ setDAGParamsFromMRConf(dagConf);
+
Path dagConfFilePath = new Path(jobSubmitDir,
TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
FSDataOutputStream dagConfOut =