You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC
svn commit: r1469642 [36/36] - in /incubator/tez/branches/TEZ-1: ./
example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/
example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/
tez-common/src/main/ tez-common/src/main/java/ t...
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,1030 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.zookeeper.Environment.Entry;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
+ */
+@SuppressWarnings({ "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+ private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+ private ResourceMgrDelegate resMgrDelegate;
+ private ClientCache clientCache;
+ private Configuration conf;
+ private final FileContext defaultFileContext;
+
+ private static final Object classpathLock = new Object();
+ private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+ private static String initialClasspath = null;
+
+ final public static FsPermission DAG_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644);
+
+ /**
+ * Yarn runner incapsulates the client interface of
+ * yarn
+ * @param conf the configuration object for the client
+ */
+ public YARNRunner(Configuration conf) {
+ this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+ }
+
+ /**
+ * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+ * {@link ResourceMgrDelegate}. Enables mocking and testing.
+ * @param conf the configuration object for the client
+ * @param resMgrDelegate the resourcemanager client handle.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+ this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+ }
+
+ /**
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+ * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+ * @param conf the configuration object
+ * @param resMgrDelegate the resource manager delegate
+ * @param clientCache the client cache object.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+ ClientCache clientCache) {
+ this.conf = conf;
+ try {
+ this.resMgrDelegate = resMgrDelegate;
+ this.clientCache = clientCache;
+ this.defaultFileContext = FileContext.getFileContext(this.conf);
+ } catch (UnsupportedFileSystemException ufe) {
+ throw new RuntimeException("Error in instantiating YarnClient", ufe);
+ }
+ }
+
+ @Private
+ /**
+ * Used for testing mostly.
+ * @param resMgrDelegate the resource manager delegate to set to.
+ */
+ public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+ this.resMgrDelegate = resMgrDelegate;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getActiveTrackers();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return resMgrDelegate.getAllJobs();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getBlacklistedTrackers();
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getClusterMetrics();
+ }
+
+ @VisibleForTesting
+ Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("No HistoryServer for Tez");
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException, InterruptedException {
+ // The token is only used for serialization. So the type information
+ // mismatch should be fine.
+ return resMgrDelegate.getDelegationToken(renewer);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return resMgrDelegate.getFilesystemName();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return resMgrDelegate.getNewJobID();
+ }
+
+ @Override
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueue(queueName);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getQueues();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getRootQueues();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getChildQueues(parent);
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getStagingAreaDir();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getSystemDir();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getTaskTrackerExpiryInterval();
+ }
+
+ private Map<String, LocalResource> createJobLocalResources(
+ Configuration jobConf, String jobSubmitDir)
+ throws IOException {
+
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+ URL yarnUrlForJobSubmitDir = ConverterUtils
+ .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+ .resolvePath(
+ defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+ LOG.debug("Creating setup context, jobSubmitDir url is "
+ + yarnUrlForJobSubmitDir);
+
+ localResources.put(MRJobConfig.JOB_CONF_FILE,
+ createApplicationResource(defaultFileContext,
+ jobConfPath, LocalResourceType.FILE));
+ if (jobConf.get(MRJobConfig.JAR) != null) {
+ Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+ LocalResource rc = createApplicationResource(defaultFileContext,
+ jobJarPath,
+ LocalResourceType.FILE);
+ // FIXME fix pattern support
+ // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+ // rc.setPattern(pattern);
+ localResources.put(MRJobConfig.JOB_JAR, rc);
+ } else {
+ // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+ // mapreduce jar itself which is already on the classpath.
+ LOG.info("Job jar is not present. "
+ + "Not adding any jar to the list of resources.");
+ }
+
+ // TODO gross hack
+ for (String s : new String[] {
+ MRJobConfig.JOB_SPLIT,
+ MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.APPLICATION_TOKENS_FILE }) {
+ localResources.put(s,
+ createApplicationResource(defaultFileContext,
+ new Path(jobSubmitDir, s), LocalResourceType.FILE));
+ }
+
+ MRApps.setupDistributedCache(jobConf, localResources);
+
+ return localResources;
+ }
+
+ // FIXME isn't this a nice mess of a client?
+ // read input, write splits, read splits again
+ private TaskLocationHint[] getMapLocationHintsFromInputSplits(JobID jobId,
+ FileSystem fs, Configuration conf,
+ String jobSubmitDir) throws IOException {
+ LOG.info("XXXX Reading splits information");
+ TaskSplitMetaInfo[] splitsInfo =
+ SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
+ new Path(jobSubmitDir));
+ int splitsCount = splitsInfo.length;
+ LOG.info("XXXX Found splits information, splitCount=" + splitsCount);
+ TaskLocationHint[] locationHints =
+ new TaskLocationHint[splitsCount];
+ for (int i = 0; i < splitsCount; ++i) {
+ TaskLocationHint locationHint =
+ new TaskLocationHint(splitsInfo[i].getLocations(), null);
+ locationHints[i] = locationHint;
+ }
+ return locationHints;
+ }
+
+ private static String getInitialClasspath(Configuration conf)
+ throws IOException {
+ synchronized (classpathLock) {
+ if (initialClasspathFlag.get()) {
+ return initialClasspath;
+ }
+ Map<String, String> env = new HashMap<String, String>();
+ MRApps.setClasspath(env, conf);
+ initialClasspath = env.get(Environment.CLASSPATH.name());
+ initialClasspathFlag.set(true);
+ return initialClasspath;
+ }
+ }
+
+ private void setupCommonChildEnv(Configuration conf,
+ Map<String, String> environment) throws IOException {
+
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ getInitialClasspath(conf));
+
+ // Shell
+ environment.put(Environment.SHELL.name(), conf.get(
+ MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
+
+ // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+ Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
+ Environment.PWD.$());
+
+ // Add the env variables passed by the admin
+ Apps.setEnvFromInputString(environment, conf.get(
+ MRJobConfig.MAPRED_ADMIN_USER_ENV,
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
+
+ // FIXME is this really required?
+ // Add stdout/stderr env
+ environment.put(
+ MRJobConfig.STDOUT_LOGFILE_ENV,
+ getTaskLogFile(TaskLog.LogName.STDOUT)
+ );
+ environment.put(
+ MRJobConfig.STDERR_LOGFILE_ENV,
+ getTaskLogFile(TaskLog.LogName.STDERR)
+ );
+
+ }
+
+ private static String getChildEnv(Configuration jobConf, boolean isMap) {
+ if (isMap) {
+ return jobConf.get(MRJobConfig.MAP_ENV, "");
+ }
+ return jobConf.get(MRJobConfig.REDUCE_ENV, "");
+ }
+
+ private static String getChildLogLevel(Configuration conf, boolean isMap) {
+ if (isMap) {
+ return conf.get(
+ MRJobConfig.MAP_LOG_LEVEL,
+ JobConf.DEFAULT_LOG_LEVEL.toString()
+ );
+ } else {
+ return conf.get(
+ MRJobConfig.REDUCE_LOG_LEVEL,
+ JobConf.DEFAULT_LOG_LEVEL.toString()
+ );
+ }
+ }
+
+ private static String getTaskLogFile(LogName filter) {
+ return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
+ + filter.toString();
+ }
+
+ private static void setupLog4jProperties(Configuration jobConf,
+ boolean isMap,
+ Vector<String> vargs,
+ long logSize) {
+ String logLevel = getChildLogLevel(jobConf, isMap);
+ MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+ }
+
+ private void setupMapReduceEnv(Configuration jobConf,
+ Map<String, String> environment, boolean isMap) throws IOException {
+
+ if (isMap) {
+ warnForJavaLibPath(
+ conf.get(MRJobConfig.MAP_JAVA_OPTS,""),
+ "map",
+ MRJobConfig.MAP_JAVA_OPTS,
+ MRJobConfig.MAP_ENV);
+ warnForJavaLibPath(
+ conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
+ "map",
+ MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ } else {
+ warnForJavaLibPath(
+ conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
+ "reduce",
+ MRJobConfig.REDUCE_JAVA_OPTS,
+ MRJobConfig.REDUCE_ENV);
+ warnForJavaLibPath(
+ conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
+ "reduce",
+ MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ }
+
+ setupCommonChildEnv(jobConf, environment);
+
+ // Add the env variables passed by the user
+ String mapredChildEnv = getChildEnv(jobConf, isMap);
+ Apps.setEnvFromInputString(environment, mapredChildEnv);
+
+ // Set logging level in the environment.
+ // This is so that, if the child forks another "bin/hadoop" (common in
+ // streaming) it will have the correct loglevel.
+ environment.put(
+ "HADOOP_ROOT_LOGGER",
+ getChildLogLevel(jobConf, isMap) + ",CLA");
+
+ // FIXME: don't think this is also needed given we already set java
+ // properties.
+ // TODO Change this not to use JobConf.
+ long logSize = TaskLog.getTaskLogLength(new JobConf(jobConf));
+ Vector<String> logProps = new Vector<String>(4);
+ setupLog4jProperties(jobConf, isMap, logProps, logSize);
+ Iterator<String> it = logProps.iterator();
+ StringBuffer buffer = new StringBuffer();
+ while (it.hasNext()) {
+ buffer.append(" " + it.next());
+ }
+
+ // FIXME supposedly required for streaming, should we remove it and let
+ // YARN set it for all containers?
+ String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+ if (hadoopClientOpts == null) {
+ hadoopClientOpts = "";
+ } else {
+ hadoopClientOpts = hadoopClientOpts + " ";
+ }
+ hadoopClientOpts = hadoopClientOpts + buffer.toString();
+ // environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
+
+ // FIXME for this to work, we need YARN-561 and the task runtime changed
+ // to use YARN-561
+
+ addTezClasspathToEnv(conf, environment);
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ getInitialClasspath(conf));
+
+ LOG.info("XXXX Dumping out env for child, isMap=" + isMap);
+ for (Map.Entry<String, String> entry : environment.entrySet()) {
+ LOG.info("XXXX env entry: "
+ + entry.getKey()
+ + "=" + entry.getValue());
+ }
+ }
+
+ private Vertex configureReduceStage(FileSystem fs, JobID jobId,
+ Configuration jobConf, String jobSubmitDir, Credentials ts,
+ Map<String, LocalResource> jobLocalResources, int iReduceIndex)
+ throws IOException {
+ int stageNum = iReduceIndex + 1;
+ Configuration conf = MultiStageMRConfigUtil.getIntermediateStageConf(jobConf, stageNum);
+ int numTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ Vertex vertex = new Vertex(
+ MultiStageMRConfigUtil.getIntermediateReduceVertexName(stageNum),
+ "org.apache.tez.mapreduce.task.IntermediateTask", numTasks);
+
+ Map<String, String> reduceEnv = new HashMap<String, String>();
+ setupMapReduceEnv(conf, reduceEnv, false);
+
+ Resource reduceResource = BuilderUtils.newResource(conf.getInt(
+ MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
+ conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
+
+ Map<String, LocalResource> reduceLocalResources = new TreeMap<String, LocalResource>();
+ reduceLocalResources.putAll(jobLocalResources);
+ // TODO MRR Don't bother localizing the input splits for the reduce vertices.
+
+ vertex.setTaskEnvironment(reduceEnv);
+ vertex.setTaskLocalResources(reduceLocalResources);
+ vertex.setTaskLocationsHint(null);
+ vertex.setTaskResource(reduceResource);
+
+ return vertex;
+ }
+
+ private Vertex[] configureMultStageMRR(FileSystem fs, JobID jobId,
+ JobConf jobConf, String jobSubmitDir, Credentials ts,
+ Map<String, LocalResource> jobLocalResources, DAG dag) throws IOException {
+
+ int numIntermediateStages = MultiStageMRConfigUtil
+ .getNumIntermediateStages(jobConf);
+
+ Vertex[] vertices = new Vertex[numIntermediateStages];
+
+ for (int i = 0; i < numIntermediateStages; i++) {
+ vertices[i] = configureReduceStage(fs, jobId, jobConf, jobSubmitDir, ts,
+ jobLocalResources, i);
+ dag.addVertex(vertices[i]);
+ }
+ return vertices;
+ }
+
+ private DAG createDAG(FileSystem fs, JobID jobId, JobConf jobConf,
+ String jobSubmitDir, Credentials ts,
+ Map<String, LocalResource> jobLocalResources) throws IOException {
+
+ DAG dag = new DAG();
+
+ int numMaps = jobConf.getInt(MRJobConfig.NUM_MAPS, 0);
+ int numReduces = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ int intermediateReduces = jobConf.getInt(
+ MRJobConfig.MRR_INTERMEDIATE_STAGES, 0);
+
+ boolean mapOnly =
+ (numMaps > 0 && numReduces == 0 && intermediateReduces == 0);
+ boolean isMRR = (intermediateReduces > 0);
+
+ LOG.info("XXXX Parsing job config"
+ + ", numMaps=" + numMaps
+ + ", numReduces=" + numReduces
+ + ", intermediateReduces=" + intermediateReduces);
+
+ // configure map vertex
+ String mapProcessor = mapOnly ?
+ "org.apache.tez.mapreduce.task.MapOnlyTask"
+ : "org.apache.tez.mapreduce.task.InitialTask";
+ Vertex mapVertex = new Vertex("map", mapProcessor, numMaps);
+
+ // FIXME set up map environment
+ Map<String, String> mapEnv = new HashMap<String, String>();
+ setupMapReduceEnv(jobConf, mapEnv, true);
+
+ TaskLocationHint[] inputSplitLocations =
+ getMapLocationHintsFromInputSplits(jobId, fs, jobConf, jobSubmitDir);
+
+ Resource mapResource = BuilderUtils.newResource(
+ jobConf.getInt(MRJobConfig.MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB),
+ jobConf.getInt(MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES));
+
+ Map<String, LocalResource> mapLocalResources =
+ new TreeMap<String, LocalResource>();
+ mapLocalResources.putAll(jobLocalResources);
+
+ mapVertex.setTaskEnvironment(mapEnv);
+ mapVertex.setTaskLocalResources(mapLocalResources);
+ mapVertex.setTaskLocationsHint(inputSplitLocations);
+ mapVertex.setTaskResource(mapResource);
+
+ LOG.info("XXXX Adding map vertex to DAG"
+ + ", vertexName=" + mapVertex.getVertexName()
+ + ", processor=" + mapVertex.getProcessorName()
+ + ", parrellism=" + mapVertex.getParallelism());
+ dag.addVertex(mapVertex);
+
+ Vertex[] intermediateVertices = null;
+ // configure intermediate reduces
+ if (isMRR) {
+ intermediateVertices = configureMultStageMRR(fs, jobId, jobConf,
+ jobSubmitDir, ts, jobLocalResources, dag);
+ }
+
+ // configure final reduce vertex
+ if (numReduces > 0) {
+ String reduceProcessor =
+ "org.apache.tez.mapreduce.task.FinalTask";
+ Vertex reduceVertex = new Vertex("reduce", reduceProcessor, numReduces);
+
+ // FIXME set up reduce environment
+ Map<String, String> reduceEnv = new HashMap<String, String>();
+ setupMapReduceEnv(jobConf, reduceEnv, false);
+
+ Resource reduceResource = BuilderUtils.newResource(
+ jobConf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
+ jobConf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
+
+ Map<String, LocalResource> reduceLocalResources =
+ new TreeMap<String, LocalResource>();
+ reduceLocalResources.putAll(jobLocalResources);
+
+ reduceVertex.setTaskEnvironment(reduceEnv);
+ reduceVertex.setTaskLocalResources(reduceLocalResources);
+ reduceVertex.setTaskLocationsHint(null);
+ reduceVertex.setTaskResource(reduceResource);
+
+ LOG.info("XXXX Adding reduce vertex to DAG"
+ + ", vertexName=" + reduceVertex.getVertexName()
+ + ", processor=" + reduceVertex.getProcessorName()
+ + ", parrellism=" + reduceVertex.getParallelism());
+ dag.addVertex(reduceVertex);
+
+ EdgeProperty edgeProperty = new EdgeProperty();
+ Edge edge = null;
+ if (!isMRR) {
+ edge = new Edge(mapVertex, reduceVertex, edgeProperty);
+ dag.addEdge(edge);
+ } else {
+
+ Edge initialEdge = new Edge(mapVertex, intermediateVertices[0],
+ edgeProperty);
+ dag.addEdge(initialEdge);
+
+ int numIntermediateEdges = intermediateVertices.length - 1;
+ for (int i = 0; i < numIntermediateEdges; i++) {
+ Edge iEdge = new Edge(intermediateVertices[i],
+ intermediateVertices[i + 1], edgeProperty);
+ dag.addEdge(iEdge);
+ }
+
+ Edge finalEdge = new Edge(
+ intermediateVertices[intermediateVertices.length - 1],
+ reduceVertex, edgeProperty);
+ dag.addEdge(finalEdge);
+ }
+
+ }
+
+ return dag;
+ }
+
+ private void addTezClasspathToEnv(Configuration conf,
+ Map<String, String> environment) {
+ for (String c : conf.getStrings(
+ TezConfiguration.TEZ_APPLICATION_CLASSPATH,
+ TezConfiguration.DEFAULT_TEZ_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(environment,
+ ApplicationConstants.Environment.CLASSPATH.name(), c.trim());
+ }
+ }
+
+ private ApplicationSubmissionContext createApplicationSubmissionContext(
+ FileSystem fs, DAG dag,
+ Configuration jobConf, String jobSubmitDir, Credentials ts,
+ Map<String, LocalResource> jobLocalResources) throws IOException {
+
+ ApplicationId applicationId = resMgrDelegate.getApplicationId();
+
+ // Setup resource requirements
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
+ capability.setMemory(
+ conf.getInt(TezConfiguration.DAG_AM_RESOURCE_MEMORY_MB,
+ TezConfiguration.DEFAULT_DAG_AM_RESOURCE_MEMORY_MB));
+ capability.setVirtualCores(
+ conf.getInt(TezConfiguration.DAG_AM_RESOURCE_CPU_VCORES,
+ TezConfiguration.DEFAULT_DAG_AM_RESOURCE_CPU_VCORES));
+ LOG.debug("AppMaster capability = " + capability);
+
+ // Setup security tokens
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
+ 0, dob.getLength());
+
+ // Setup the command to run the AM
+ List<String> vargs = new ArrayList<String>(8);
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+ // FIXME set up logging related properties
+ // TODO -Dtez.root.logger??
+ // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+
+ // FIXME admin command opts and user command opts for tez?
+ String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+ MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ vargs.add(mrAppMasterAdminOptions);
+
+ // Add AM user command opts
+ String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+ MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+ vargs.add(mrAppMasterUserOptions);
+
+ vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+
+
+ Vector<String> vargsFinal = new Vector<String>(8);
+ // Final command
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add(mergedCommand.toString());
+
+ LOG.debug("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+
+ // Setup the CLASSPATH in environment
+ // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+ addTezClasspathToEnv(conf, environment);
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+ getInitialClasspath(conf));
+
+ // Setup the environment variables for Admin first
+ MRApps.setEnvFromInputString(environment,
+ conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
+ // Setup the environment variables (LD_LIBRARY_PATH, etc)
+ MRApps.setEnvFromInputString(environment,
+ conf.get(MRJobConfig.MR_AM_ENV));
+
+ // FIXME remove this
+ Map<String, LocalResource> localResources =
+ new TreeMap<String, LocalResource>();
+
+ localResources.putAll(jobLocalResources);
+
+ // FIXME add serialized dag conf
+ DAGConfiguration dagConf = dag.serializeDag();
+
+ Path dagConfFilePath = new Path(jobSubmitDir,
+ TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
+ FSDataOutputStream dagConfOut =
+ FileSystem.create(fs, dagConfFilePath,
+ new FsPermission(DAG_FILE_PERMISSION));
+ try {
+ dagConf.writeXml(dagConfOut);
+ } finally {
+ dagConfOut.close();
+ }
+ localResources.put(TezConfiguration.DAG_AM_PLAN_CONFIG_XML,
+ createApplicationResource(defaultFileContext,
+ dagConfFilePath, LocalResourceType.FILE));
+
+ // FIXME add tez conf if needed
+
+ // FIXME are we using MR acls for tez?
+ Map<ApplicationAccessType, String> acls
+ = new HashMap<ApplicationAccessType, String>();
+
+ // Setup ContainerLaunchContext for AM container
+ ContainerLaunchContext amContainer = BuilderUtils
+ .newContainerLaunchContext(UserGroupInformation
+ .getCurrentUser().getShortUserName(), localResources,
+ environment, vargsFinal, null, securityTokens, acls);
+
+ // Set up the ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(applicationId); // ApplicationId
+ appContext.setResource(capability); // resource
+ appContext.setQueue( // Queue name
+ jobConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ appContext.setApplicationName( // Job name
+ jobConf.get(JobContext.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME));
+ appContext.setCancelTokensWhenComplete(
+ conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
+ appContext.setAMContainerSpec(amContainer); // AM Container
+
+ return appContext;
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+
+ // Upload only in security mode: TODO
+ Path applicationTokensFile =
+ new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+ try {
+ ts.writeTokenStorageFile(applicationTokensFile, conf);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ FileSystem fs = FileSystem.get(conf);
+ JobConf jobConf = new JobConf(new TezConfiguration(conf));
+
+ // FIXME set up job resources
+ Map<String, LocalResource> jobLocalResources =
+ createJobLocalResources(jobConf, jobSubmitDir);
+ DAG dag = createDAG(fs, jobId, jobConf, jobSubmitDir, ts,
+ jobLocalResources);
+ ApplicationSubmissionContext appContext =
+ createApplicationSubmissionContext(fs, dag, jobConf, jobSubmitDir, ts,
+ jobLocalResources);
+
+ // Submit to ResourceManager
+ ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
+
+ ApplicationReport appMasterReport = resMgrDelegate
+ .getApplicationReport(applicationId);
+ String diagnostics =
+ (appMasterReport == null ?
+ "application report is null" : appMasterReport.getDiagnostics());
+ if (appMasterReport == null
+ || appMasterReport.getYarnApplicationState() == YarnApplicationState.FAILED
+ || appMasterReport.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ throw new IOException("Failed to run job : " +
+ diagnostics);
+ }
+
+ while (true) {
+ appMasterReport = resMgrDelegate
+ .getApplicationReport(applicationId);
+ diagnostics =
+ (appMasterReport == null ?
+ "application report is null" : appMasterReport.getDiagnostics());
+ if (appMasterReport == null) {
+ throw new IOException("Failed to run job : " +
+ diagnostics);
+ }
+ YarnApplicationState state = appMasterReport.getYarnApplicationState();
+ if (state.equals(YarnApplicationState.FAILED)
+ || state.equals(YarnApplicationState.FINISHED)
+ || state.equals(YarnApplicationState.KILLED)) {
+ LOG.info("Job completed"
+ + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
+ + ", finalState=" + appMasterReport.getYarnApplicationState()
+ + ", diagnostics=" + diagnostics);
+ break;
+ } else {
+ LOG.info("Job in progress"
+ + ", finalStatus=" + appMasterReport.getFinalApplicationStatus()
+ + ", finalState=" + appMasterReport.getYarnApplicationState()
+ + ", diagnostics=" + diagnostics);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // FIXME
+ return clientCache.getClient(jobId).getJobStatus(jobId);
+ }
+
+ private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
+ throws IOException {
+ LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ @Override
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ resMgrDelegate.setJobPriority(arg0, arg1);
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return resMgrDelegate.getProtocolVersion(arg0, arg1);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+
+ @Override
+ public Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0).getJobCounters(arg0);
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobID) throws IOException,
+ InterruptedException {
+ JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
+ return status;
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+ int arg2) throws IOException, InterruptedException {
+ return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+ throws IOException, InterruptedException {
+ return clientCache.getClient(jobID)
+ .getTaskReports(jobID, taskType);
+ }
+
+ @Override
+ public void killJob(JobID arg0) throws IOException, InterruptedException {
+ /* check if the status is not running, if not send kill to RM */
+ JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+ if (status.getState() != JobStatus.State.RUNNING) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ return;
+ }
+
+ try {
+ /* send a kill to the AM */
+ clientCache.getClient(arg0).killJob(arg0);
+ long currentTimeMillis = System.currentTimeMillis();
+ long timeKillIssued = currentTimeMillis;
+ while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
+ != JobStatus.State.KILLED)) {
+ try {
+ Thread.sleep(1000L);
+ } catch(InterruptedException ie) {
+ /** interrupted, just break */
+ break;
+ }
+ currentTimeMillis = System.currentTimeMillis();
+ status = clientCache.getClient(arg0).getJobStatus(arg0);
+ }
+ } catch(IOException io) {
+ LOG.debug("Error when checking for application status", io);
+ }
+ if (status.getState() != JobStatus.State.KILLED) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ }
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String arg0) throws IOException {
+ return new AccessControlList("*");
+ }
+
+ @Override
+ public JobTrackerStatus getJobTrackerStatus() throws IOException,
+ InterruptedException {
+ return JobTrackerStatus.RUNNING;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+ clientMethodsHash);
+ }
+
+ @Override
+ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException {
+ return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+ }
+
+ private static void warnForJavaLibPath(String opts, String component,
+ String javaConf, String envConf) {
+ if (opts != null && opts.contains("-Djava.library.path")) {
+ LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+ "programs to no longer function if hadoop native libraries " +
+ "are used. These values should be set as part of the " +
+ "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+ envConf + " config settings.");
+ }
+ }
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+ return new YARNRunner(conf);
+ }
+ return null;
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+ throws IOException {
+ return create(conf);
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ // nothing to do
+ }
+
+}
Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Thu Apr 18 23:54:18 2013
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider