You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/26 02:14:05 UTC
[2/3] TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move
YARNRunner to tez-mapreduce project (bikas)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
new file mode 100644
index 0000000..d3b2c08
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
@@ -0,0 +1,232 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ResourceMgrDelegate {
+ private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+
+ private YarnConfiguration conf;
+ private GetNewApplicationResponse application;
+ private ApplicationId applicationId;
+ private YarnClient client;
+ private InetSocketAddress rmAddress;
+
+ /**
+ * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
+ * @param conf the configuration object.
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf) {
+ super();
+ this.conf = conf;
+ client = YarnClient.createYarnClient();
+ client.init(conf);
+ this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+ client.start();
+ }
+
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarnNodes(client.getNodeReports());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ try {
+ Set<String> appTypes = new HashSet<String>(1);
+ appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
+ return TypeConverter.fromYarnApps(client.getApplications(appTypes),
+ this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ // TODO: Implement getBlacklistedTrackers
+ LOG.warn("getBlacklistedTrackers - Not implemented yet");
+ return new TaskTrackerInfo[0];
+ }
+
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ YarnClusterMetrics metrics;
+ try {
+ metrics = client.getYarnClusterMetrics();
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
+ metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+ metrics.getNumNodeManagers(), 0, 0);
+ return oldMetrics;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Token getDelegationToken(Text renewer) throws IOException,
+ InterruptedException {
+ try {
+ // Remove rmAddress after YARN-868 is addressed
+ return ConverterUtils.convertFromYarn(
+ client.getRMDelegationToken(renewer), rmAddress);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return FileSystem.get(conf).getUri().toString();
+ }
+
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ try {
+ this.application =
+ client.createApplication().getNewApplicationResponse();
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ this.applicationId = this.application.getApplicationId();
+ return TypeConverter.fromYarn(applicationId);
+ }
+
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarn(
+ client.getQueueInfo(queueName), this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueUserAclsInfo(
+ client.getQueueAclsInfo());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
+ this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ try {
+ return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
+ this.conf);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+ String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path path = MRApps.getStagingAreaDir(conf, user);
+ LOG.debug("getStagingAreaDir: dir=" + path);
+ return path.toString();
+ }
+
+
+ public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+ //FileContext.getFileContext(conf).delete(sysDir, true);
+ return sysDir.toString();
+ }
+
+
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return 0;
+ }
+
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ return;
+ }
+
+
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public void killApplication(ApplicationId appId)
+ throws YarnException, IOException {
+ client.killApplication(appId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
new file mode 100644
index 0000000..6b6dd35
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -0,0 +1,722 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
+ */
+@SuppressWarnings({ "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+ private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+ private ResourceMgrDelegate resMgrDelegate;
+ private ClientCache clientCache;
+ private Configuration conf;
+ private final FileContext defaultFileContext;
+
+ final public static FsPermission DAG_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644);
+ final public static int UTF8_CHUNK_SIZE = 16 * 1024;
+
+ private final TezConfiguration tezConf;
+ private final TezClient tezClient;
+ private DAGClient dagClient;
+
+ /**
+ * Yarn runner incapsulates the client interface of
+ * yarn
+ * @param conf the configuration object for the client
+ */
+ public YARNRunner(Configuration conf) {
+ this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+ }
+
+ /**
+ * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+ * {@link ResourceMgrDelegate}. Enables mocking and testing.
+ * @param conf the configuration object for the client
+ * @param resMgrDelegate the resourcemanager client handle.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+ this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+ }
+
+ /**
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+ * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+ * @param conf the configuration object
+ * @param resMgrDelegate the resource manager delegate
+ * @param clientCache the client cache object.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+ ClientCache clientCache) {
+ this.conf = conf;
+ this.tezConf = new TezConfiguration(conf);
+ try {
+ this.resMgrDelegate = resMgrDelegate;
+ this.tezClient = new TezClient(tezConf);
+ this.clientCache = clientCache;
+ this.defaultFileContext = FileContext.getFileContext(this.conf);
+
+ } catch (UnsupportedFileSystemException ufe) {
+ throw new RuntimeException("Error in instantiating YarnClient", ufe);
+ }
+ }
+
+ @VisibleForTesting
+ @Private
+ /**
+ * Used for testing mostly.
+ * @param resMgrDelegate the resource manager delegate to set to.
+ */
+ public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+ this.resMgrDelegate = resMgrDelegate;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getActiveTrackers();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return resMgrDelegate.getAllJobs();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getBlacklistedTrackers();
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getClusterMetrics();
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException, InterruptedException {
+ // The token is only used for serialization. So the type information
+ // mismatch should be fine.
+ return resMgrDelegate.getDelegationToken(renewer);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return resMgrDelegate.getFilesystemName();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return resMgrDelegate.getNewJobID();
+ }
+
+ @Override
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueue(queueName);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getQueues();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getRootQueues();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getChildQueues(parent);
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getStagingAreaDir();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getSystemDir();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getTaskTrackerExpiryInterval();
+ }
+
+ private Map<String, LocalResource> createJobLocalResources(
+ Configuration jobConf, String jobSubmitDir)
+ throws IOException {
+
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+ URL yarnUrlForJobSubmitDir = ConverterUtils
+ .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+ .resolvePath(
+ defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+ LOG.debug("Creating setup context, jobSubmitDir url is "
+ + yarnUrlForJobSubmitDir);
+
+ localResources.put(MRJobConfig.JOB_CONF_FILE,
+ createApplicationResource(defaultFileContext,
+ jobConfPath, LocalResourceType.FILE));
+ if (jobConf.get(MRJobConfig.JAR) != null) {
+ Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+ LocalResource rc = createApplicationResource(defaultFileContext,
+ jobJarPath,
+ LocalResourceType.FILE);
+ // FIXME fix pattern support
+ // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+ // rc.setPattern(pattern);
+ localResources.put(MRJobConfig.JOB_JAR, rc);
+ } else {
+ // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+ // mapreduce jar itself which is already on the classpath.
+ LOG.info("Job jar is not present. "
+ + "Not adding any jar to the list of resources.");
+ }
+
+ // TODO gross hack
+ for (String s : new String[] {
+ MRJobConfig.JOB_SPLIT,
+ MRJobConfig.JOB_SPLIT_METAINFO}) {
+ localResources.put(s,
+ createApplicationResource(defaultFileContext,
+ new Path(jobSubmitDir, s), LocalResourceType.FILE));
+ }
+
+ MRApps.setupDistributedCache(jobConf, localResources);
+
+ return localResources;
+ }
+
+ // FIXME isn't this a nice mess of a client?
+ // read input, write splits, read splits again
+ private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
+ FileSystem fs, Configuration conf,
+ String jobSubmitDir) throws IOException {
+ TaskSplitMetaInfo[] splitsInfo =
+ SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
+ new Path(jobSubmitDir));
+ int splitsCount = splitsInfo.length;
+ List<TaskLocationHint> locationHints =
+ new ArrayList<TaskLocationHint>(splitsCount);
+ for (int i = 0; i < splitsCount; ++i) {
+ TaskLocationHint locationHint =
+ new TaskLocationHint(
+ new HashSet<String>(
+ Arrays.asList(splitsInfo[i].getLocations())), null);
+ locationHints.add(locationHint);
+ }
+ return locationHints;
+ }
+
+ private void setupMapReduceEnv(Configuration jobConf,
+ Map<String, String> environment, boolean isMap) throws IOException {
+
+ if (isMap) {
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
+ "map",
+ MRJobConfig.MAP_JAVA_OPTS,
+ MRJobConfig.MAP_ENV);
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
+ "map",
+ MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ } else {
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
+ "reduce",
+ MRJobConfig.REDUCE_JAVA_OPTS,
+ MRJobConfig.REDUCE_ENV);
+ warnForJavaLibPath(
+ jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
+ "reduce",
+ MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+ MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ }
+
+ MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
+ }
+
+ private Vertex createVertexForStage(Configuration stageConf,
+ Map<String, LocalResource> jobLocalResources,
+ List<TaskLocationHint> locations, int stageNum, int totalStages)
+ throws IOException {
+ // stageNum starts from 0, goes till numStages - 1
+ boolean isMap = false;
+ if (stageNum == 0) {
+ isMap = true;
+ }
+
+ int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
+ : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ String processorName = isMap ? MapProcessor.class.getName()
+ : ReduceProcessor.class.getName();
+ String vertexName = null;
+ if (isMap) {
+ vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+ } else {
+ if (stageNum == totalStages - 1) {
+ vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+ } else {
+ vertexName = MultiStageMRConfigUtil
+ .getIntermediateStageVertexName(stageNum);
+ }
+ }
+
+ Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
+ : MRHelpers.getReduceResource(stageConf);
+ byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+ Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
+ setUserPayload(vertexUserPayload),
+ numTasks, taskResource);
+ if (isMap) {
+ byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
+ MRHelpers.addMRInput(vertex, mapInputPayload, null);
+ }
+ // Map only jobs.
+ if (stageNum == totalStages -1) {
+ MRHelpers.addMROutput(vertex, vertexUserPayload);
+ }
+
+ Map<String, String> taskEnv = new HashMap<String, String>();
+ setupMapReduceEnv(stageConf, taskEnv, isMap);
+
+ Map<String, LocalResource> taskLocalResources =
+ new TreeMap<String, LocalResource>();
+ // PRECOMMIT Remove split localization for reduce tasks if it's being set
+ // here
+ taskLocalResources.putAll(jobLocalResources);
+
+ String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
+ : MRHelpers.getReduceJavaOpts(stageConf);
+
+ vertex.setTaskEnvironment(taskEnv)
+ .setTaskLocalResources(taskLocalResources)
+ .setTaskLocationsHint(locations)
+ .setJavaOpts(taskJavaOpts);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding vertex to DAG" + ", vertexName="
+ + vertex.getVertexName() + ", processor="
+ + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
+ + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
+ + ", resources=" + vertex.getTaskResource()
+ // TODO Add localResources and Environment
+ );
+ }
+
+ return vertex;
+ }
+
+ private DAG createDAG(FileSystem fs, JobID jobId, Configuration[] stageConfs,
+ String jobSubmitDir, Credentials ts,
+ Map<String, LocalResource> jobLocalResources) throws IOException {
+
+ String jobName = stageConfs[0].get(MRJobConfig.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME);
+ DAG dag = new DAG(jobName);
+
+ LOG.info("Number of stages: " + stageConfs.length);
+
+ List<TaskLocationHint> mapInputLocations =
+ getMapLocationHintsFromInputSplits(
+ jobId, fs, stageConfs[0], jobSubmitDir);
+ List<TaskLocationHint> reduceInputLocations = null;
+
+ Vertex[] vertices = new Vertex[stageConfs.length];
+ for (int i = 0; i < stageConfs.length; i++) {
+ vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
+ i == 0 ? mapInputLocations : reduceInputLocations, i,
+ stageConfs.length);
+ }
+
+ for (int i = 0; i < vertices.length; i++) {
+ dag.addVertex(vertices[i]);
+ if (i > 0) {
+ EdgeProperty edgeProperty = new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(OnFileSortedOutput.class.getName()),
+ new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
+
+ Edge edge = null;
+ edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
+ dag.addEdge(edge);
+ }
+
+ }
+ return dag;
+ }
+
+ private TezConfiguration getDAGAMConfFromMRConf() {
+ TezConfiguration finalConf = new TezConfiguration(this.tezConf);
+ Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+ .getMRToDAGParamMap();
+
+ for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+ if (finalConf.get(entry.getKey()) != null) {
+ finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
+ finalConf.unset(entry.getKey());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+ + " to Tez key: " + entry.getValue() + " with value "
+ + finalConf.get(entry.getValue()));
+ }
+ }
+ }
+ return finalConf;
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+
+ ApplicationId appId = resMgrDelegate.getApplicationId();
+
+ FileSystem fs = FileSystem.get(conf);
+ // Loads the job.xml written by the user.
+ JobConf jobConf = new JobConf(new TezConfiguration(conf));
+
+ // Extract individual raw MR configs.
+ Configuration[] stageConfs = MultiStageMRConfToTezTranslator
+ .getStageConfs(jobConf);
+
+ // Transform all confs to use Tez keys
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
+ null);
+ for (int i = 1; i < stageConfs.length; i++) {
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
+ stageConfs[i - 1]);
+ }
+
+ // create inputs to tezClient.submit()
+
+ // FIXME set up job resources
+ Map<String, LocalResource> jobLocalResources =
+ createJobLocalResources(stageConfs[0], jobSubmitDir);
+
+ // FIXME createDAG should take the tezConf as a parameter, instead of using
+ // MR keys.
+ DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
+ jobLocalResources);
+
+ List<String> vargs = new LinkedList<String>();
+ // admin command opts and user command opts
+ String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+ MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ vargs.add(mrAppMasterAdminOptions);
+
+ // Add AM user command opts
+ String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+ MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+ vargs.add(mrAppMasterUserOptions);
+
+ StringBuilder javaOpts = new StringBuilder();
+ for (String varg : vargs) {
+ javaOpts.append(varg).append(" ");
+ }
+
+ // Setup the CLASSPATH in environment
+ // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+
+ // Setup the environment variables for AM
+ MRHelpers.updateEnvironmentForMRAM(conf, environment);
+
+ TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
+ dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
+
+ // Submit to ResourceManager
+ try {
+ dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ jobSubmitDir);
+ AMConfiguration amConfig = new AMConfiguration(
+ jobConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME),
+ environment,
+ jobLocalResources, dagAMConf, ts);
+ tezClient.submitDAGApplication(appId, dag, amConfig);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
+
+ return getJobStatus(jobId);
+ }
+
+ private LocalResource createApplicationResource(FileContext fs, Path p,
+ LocalResourceType type) throws IOException {
+ LocalResource rsrc = Records.newRecord(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ @Override
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ resMgrDelegate.setJobPriority(arg0, arg1);
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return resMgrDelegate.getProtocolVersion(arg0, arg1);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+
+ @Override
+ public Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0).getJobCounters(arg0);
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobID) throws IOException,
+ InterruptedException {
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ String jobFile = MRApps.getJobFile(conf, user, jobID);
+ DAGStatus dagStatus;
+ try {
+ if(dagClient == null) {
+ dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
+ }
+ dagStatus = dagClient.getDAGStatus();
+ return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
+ } catch (TezException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+ int arg2) throws IOException, InterruptedException {
+ return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+ throws IOException, InterruptedException {
+ return clientCache.getClient(jobID)
+ .getTaskReports(jobID, taskType);
+ }
+
+ @Override
+ public void killJob(JobID arg0) throws IOException, InterruptedException {
+ /* check if the status is not running, if not send kill to RM */
+ JobStatus status = getJobStatus(arg0);
+ if (status.getState() == JobStatus.State.RUNNING ||
+ status.getState() == JobStatus.State.PREP) {
+ try {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ return;
+ }
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String arg0) throws IOException {
+ return new AccessControlList("*");
+ }
+
+ @Override
+ public JobTrackerStatus getJobTrackerStatus() throws IOException,
+ InterruptedException {
+ return JobTrackerStatus.RUNNING;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+ clientMethodsHash);
+ }
+
+ @Override
+ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException {
+ try {
+ return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static void warnForJavaLibPath(String opts, String component,
+ String javaConf, String envConf) {
+ if (opts != null && opts.contains("-Djava.library.path")) {
+ LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+ "programs to no longer function if hadoop native libraries " +
+ "are used. These values should be set as part of the " +
+ "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+ envConf + " config settings.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
new file mode 100644
index 0000000..c36dc9d
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+ return new YARNRunner(conf);
+ }
+ return null;
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+ throws IOException {
+ return create(conf);
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ // nothing to do
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..88816ca
--- /dev/null
+++ b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
new file mode 100644
index 0000000..b3c247a
--- /dev/null
+++ b/tez-tests/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>tez-tests</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce-examples</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
new file mode 100644
index 0000000..f98f392
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -0,0 +1,359 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobs {
+
+ private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
+
+ protected static MiniTezCluster mrrTezCluster;
+ protected static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target"
+ + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
+
+ private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
+ TestMRRJobs.class.getSimpleName();
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrrTezCluster == null) {
+ mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
+ 1, 1);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+ conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test (timeout = 60000)
+ public void testMRRSleepJob() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ LOG.info("\n\n\nStarting testMRRSleepJob().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO use dag client to test counters and task progress?
+ // what about completed jobs?
+ }
+
+ @Test (timeout = 60000)
+ public void testRandomWriter() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testRandomWriter().");
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+ mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+ mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+ Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
+ Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
+ FileOutputFormat.setOutputPath(job, outputDir);
+ job.setSpeculativeExecution(false);
+ job.setJarByClass(RandomTextWriterJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+ // Make sure there are three files in the output-dir
+
+ RemoteIterator<FileStatus> iterator =
+ FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
+ outputDir);
+ int count = 0;
+ while (iterator.hasNext()) {
+ FileStatus file = iterator.next();
+ if (!file.getPath().getName()
+ .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+ count++;
+ }
+ }
+ Assert.assertEquals("Number of part files is wrong!", 3, count);
+
+ }
+
+
+ @Test (timeout = 60000)
+ public void testFailingJob() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testFailingJob().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
+ job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
+
+ job.submit();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertFalse(succeeded);
+ Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO verify failed task diagnostics
+ }
+
+ @Test (timeout = 60000)
+ public void testFailingAttempt() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testFailingAttempt().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(3); // speed up failures
+ job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
+ job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
+
+ job.submit();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO verify failed task diagnostics
+ }
+
+ @Test (timeout = 60000)
+ public void testMRRSleepJobWithCompression() throws IOException,
+ InterruptedException, ClassNotFoundException {
+ LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
+
+ if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(sleepConf);
+
+ Job job = sleepJob.createJob(1, 1, 2, 1, 1,
+ 1, 1, 1, 1, 1);
+
+ job.setJarByClass(MRRSleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+
+ // enable compression
+ job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+ job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+ DefaultCodec.class.getName());
+
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+ // FIXME once counters and task progress can be obtained properly
+ // TODO use dag client to test counters and task progress?
+ // what about completed jobs?
+
+ }
+
+
+ /*
+ //@Test (timeout = 60000)
+ public void testMRRSleepJobWithSecurityOn() throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
+
+ if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+ return;
+ }
+
+ mrrTezCluster.getConfig().set(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
+ mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
+ mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
+ "rm/sightbusy-lx@LOCALHOST");
+ mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
+ "nm/sightbusy-lx@LOCALHOST");
+
+ UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
+
+ // Keep it in here instead of after RM/NM as multiple user logins happen in
+ // the same JVM.
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+ LOG.info("User name is " + user.getUserName());
+ for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+ LOG.info("Token is " + str.encodeToUrlString());
+ }
+ user.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ MRRSleepJob sleepJob = new MRRSleepJob();
+ sleepJob.setConf(mrrTezCluster.getConfig());
+ Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+ // //Job with reduces
+ // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.submit();
+ String trackingUrl = job.getTrackingURL();
+ String jobId = job.getJobID().toString();
+ job.waitForCompletion(true);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ Assert.assertTrue("Tracking URL was " + trackingUrl +
+ " but didn't Match Job ID " + jobId ,
+ trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+ return null;
+ }
+ });
+
+ // TODO later: add explicit "isUber()" checks of some sort
+ }
+ */
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
new file mode 100644
index 0000000..1bc7b4d
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobsDAGApi {
+
+ private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
+
+ protected static MiniTezCluster mrrTezCluster;
+ protected static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(true).racks(null).build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+
+ if (mrrTezCluster == null) {
+ mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(),
+ 1, 1, 1);
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+ mrrTezCluster.init(conf);
+ mrrTezCluster.start();
+ }
+
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrrTezCluster != null) {
+ mrrTezCluster.stop();
+ mrrTezCluster = null;
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ // TODO Add cleanup code.
+ }
+
+ // Submits a simple 5 stage sleep job using the DAG submit API instead of job
+ // client.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmit() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
+
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmitAndKill() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
+
+ Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Submits a DAG to AM via RPC after AM has started
+ @Test(timeout = 60000)
+ public void testMRRSleepJobViaSession() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
+
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ }
+
+ // Submits a DAG to AM via RPC after AM has started
+ @Test(timeout = 120000)
+ public void testMultipleMRRSleepJobViaSession() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ Map<String, String> commonEnv = createCommonEnv();
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+ .valueOf(new Random().nextInt(100000))));
+ remoteFs.mkdirs(remoteStagingDir);
+ TezConfiguration tezConf = new TezConfiguration(
+ mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+
+ Map<String, LocalResource> amLocalResources =
+ new HashMap<String, LocalResource>();
+
+ AMConfiguration amConfig = new AMConfiguration(
+ "default", commonEnv, amLocalResources,
+ tezConf, null);
+ TezSessionConfiguration tezSessionConfig =
+ new TezSessionConfiguration(amConfig, tezConf);
+ TezSession tezSession = new TezSession("testsession", tezSessionConfig);
+ tezSession.start();
+ Assert.assertEquals(TezSessionStatus.INITIALIZING,
+ tezSession.getSessionStatus());
+
+ State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession, false);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+ finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+ tezSession, false);
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+ Assert.assertEquals(TezSessionStatus.READY,
+ tezSession.getSessionStatus());
+
+ ApplicationId appId = tezSession.getApplicationId();
+ tezSession.stop();
+ Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+ tezSession.getSessionStatus());
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FINISHED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.FAILED)
+ || appReport.getYarnApplicationState().equals(
+ YarnApplicationState.KILLED)) {
+ break;
+ }
+ }
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appReport.getYarnApplicationState());
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+ appReport.getFinalApplicationStatus());
+ }
+
+ // Submits a simple 5 stage sleep job using tez session. Then kills it.
+ @Test(timeout = 60000)
+ public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
+
+ Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+ // TODO Add additional checks for tracking URL etc. - once it's exposed by
+ // the DAG API.
+ }
+
+ // Create and close a tez session without submitting a job
+ @Test(timeout = 60000)
+ public void testTezSessionShutdown() throws IOException,
+ InterruptedException, TezException, ClassNotFoundException, YarnException {
+ testMRRSleepJobDagSubmitCore(true, false, true, false);
+ }
+
+ @Test(timeout = 60000)
+ public void testAMSplitGeneration() throws IOException, InterruptedException,
+ TezException, ClassNotFoundException, YarnException {
+ testMRRSleepJobDagSubmitCore(true, false, false, true);
+ }
+
+ public State testMRRSleepJobDagSubmitCore(
+ boolean dagViaRPC,
+ boolean killDagWhileRunning,
+ boolean closeSessionBeforeSubmit,
+ boolean genSplitsInAM) throws IOException,
+ InterruptedException, TezException, ClassNotFoundException,
+ YarnException {
+ return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
+ closeSessionBeforeSubmit, null, genSplitsInAM);
+ }
+
+ private Map<String, String> createCommonEnv() {
+ Map<String, String> commonEnv = new HashMap<String, String>();
+ return commonEnv;
+ }
+
+ public State testMRRSleepJobDagSubmitCore(
+ boolean dagViaRPC,
+ boolean killDagWhileRunning,
+ boolean closeSessionBeforeSubmit,
+ TezSession reUseTezSession,
+ boolean genSplitsInAM) throws IOException,
+ InterruptedException, TezException, ClassNotFoundException,
+ YarnException {
+ LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
+
+ JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
+ JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
+ JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
+
+ stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
+ stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
+ stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
+ stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+ stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+ SleepInputFormat.class.getName());
+ stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ MRRSleepJobPartitioner.class.getName());
+
+ stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+ stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+ stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ stage2Conf
+ .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+ stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+ MRRSleepJobPartitioner.class.getName());
+
+ JobConf stage22Conf = new JobConf(stage2Conf);
+ stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
+
+ stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+ stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+ stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+ stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+ stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ IntWritable.class.getName());
+ stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ IntWritable.class.getName());
+ stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+ NullOutputFormat.class.getName());
+
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
+ stage1Conf);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage22Conf,
+ stage1Conf);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
+ stage2Conf); // this also works stage22 as it sets up keys etc
+
+ MRHelpers.doJobClientMagic(stage1Conf);
+ MRHelpers.doJobClientMagic(stage2Conf);
+ MRHelpers.doJobClientMagic(stage22Conf);
+ MRHelpers.doJobClientMagic(stage3Conf);
+
+ Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+ .valueOf(new Random().nextInt(100000))));
+ TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+ InputSplitInfo inputSplitInfo = null;
+ if (!genSplitsInAM) {
+ inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
+ remoteStagingDir);
+ }
+
+ byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
+ byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
+
+ DAG dag = new DAG("testMRRSleepJobDagSubmit");
+ int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
+ Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+ : null;
+ Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
+ MapProcessor.class.getName()).setUserPayload(stage1Payload),
+ stage1NumTasks, Resource.newInstance(256, 1));
+ MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
+ Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
+ ReduceProcessor.class.getName()).setUserPayload(
+ MRHelpers.createUserPayloadFromConf(stage2Conf)),
+ 1, Resource.newInstance(256, 1));
+ Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
+ ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
+ 1, Resource.newInstance(256, 1));
+ MRHelpers.addMROutput(stage3Vertex, stage3Payload);
+
+ Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+ Map<String, String> commonEnv = createCommonEnv();
+
+ if (!genSplitsInAM) {
+ // TODO Use utility method post TEZ-205.
+ Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+ stage1LocalResources.put(
+ inputSplitInfo.getSplitsFile().getName(),
+ createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ stage1LocalResources.put(
+ inputSplitInfo.getSplitsMetaInfoFile().getName(),
+ createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+ stage1LocalResources.putAll(commonLocalResources);
+
+ stage1Vertex.setTaskLocalResources(stage1LocalResources);
+ stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+ } else {
+ stage1Vertex.setTaskLocalResources(commonLocalResources);
+ }
+
+ stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+ stage1Vertex.setTaskEnvironment(commonEnv);
+
+ // TODO env, resources
+
+ stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
+ stage2Vertex.setTaskLocalResources(commonLocalResources);
+ stage2Vertex.setTaskEnvironment(commonEnv);
+
+ stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
+ stage3Vertex.setTaskLocalResources(commonLocalResources);
+ stage3Vertex.setTaskEnvironment(commonEnv);
+
+ dag.addVertex(stage1Vertex);
+ dag.addVertex(stage2Vertex);
+ dag.addVertex(stage3Vertex);
+
+ Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, new OutputDescriptor(
+ OnFileSortedOutput.class.getName()), new InputDescriptor(
+ ShuffledMergedInputLegacy.class.getName())));
+ Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, new OutputDescriptor(
+ OnFileSortedOutput.class.getName()), new InputDescriptor(
+ ShuffledMergedInputLegacy.class.getName())));
+
+ dag.addEdge(edge1);
+ dag.addEdge(edge2);
+
+ Map<String, LocalResource> amLocalResources =
+ new HashMap<String, LocalResource>();
+ amLocalResources.putAll(commonLocalResources);
+
+ TezConfiguration tezConf = new TezConfiguration(
+ mrrTezCluster.getConfig());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+ remoteStagingDir.toString());
+
+ TezClient tezClient = new TezClient(tezConf);
+ DAGClient dagClient = null;
+ TezSession tezSession = null;
+ boolean reuseSession = reUseTezSession != null;
+ TezSessionConfiguration tezSessionConfig;
+ AMConfiguration amConfig = new AMConfiguration(
+ "default", commonEnv, amLocalResources,
+ tezConf, null);
+ if(!dagViaRPC) {
+ // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
+ dagClient = tezClient.submitDAGApplication(dag, amConfig);
+ } else {
+ if (reuseSession) {
+ tezSession = reUseTezSession;
+ } else {
+ tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+ tezSession = new TezSession("testsession", tezSessionConfig);
+ tezSession.start();
+ }
+ }
+
+ if (dagViaRPC && closeSessionBeforeSubmit) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(mrrTezCluster.getConfig());
+ yarnClient.start();
+ boolean sentKillSession = false;
+ while(true) {
+ Thread.sleep(500l);
+ ApplicationReport appReport =
+ yarnClient.getApplicationReport(tezSession.getApplicationId());
+ if (appReport == null) {
+ continue;
+ }
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ if (!sentKillSession) {
+ if (appState == YarnApplicationState.RUNNING) {
+ tezSession.stop();
+ sentKillSession = true;
+ }
+ } else {
+ if (appState == YarnApplicationState.FINISHED
+ || appState == YarnApplicationState.KILLED
+ || appState == YarnApplicationState.FAILED) {
+ LOG.info("Application completed after sending session shutdown"
+ + ", yarnApplicationState=" + appState
+ + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
+ Assert.assertEquals(YarnApplicationState.FINISHED,
+ appState);
+ Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+ appReport.getFinalApplicationStatus());
+ break;
+ }
+ }
+ }
+ yarnClient.stop();
+ return null;
+ }
+
+ if(dagViaRPC) {
+ LOG.info("Submitting dag to tez session with appId="
+ + tezSession.getApplicationId());
+ dagClient = tezSession.submitDAG(dag);
+ Assert.assertEquals(TezSessionStatus.RUNNING,
+ tezSession.getSessionStatus());
+ }
+ DAGStatus dagStatus = dagClient.getDAGStatus();
+ while (!dagStatus.isCompleted()) {
+ LOG.info("Waiting for job to complete. Sleeping for 500ms."
+ + " Current state: " + dagStatus.getState());
+ Thread.sleep(500l);
+ if(killDagWhileRunning
+ && dagStatus.getState() == DAGStatus.State.RUNNING) {
+ LOG.info("Killing running dag/session");
+ if (dagViaRPC) {
+ tezSession.stop();
+ } else {
+ dagClient.tryKillDAG();
+ }
+ }
+ dagStatus = dagClient.getDAGStatus();
+ }
+ if (dagViaRPC && !reuseSession) {
+ tezSession.stop();
+ }
+ return dagStatus.getState();
+ }
+
+ private static LocalResource createLocalResource(FileSystem fc, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility)
+ throws IOException {
+ FileStatus fstat = fc.getFileStatus(file);
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+ .getPath()));
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ return LocalResource.newInstance(resourceURL, type, visibility,
+ resourceSize, resourceModificationTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
new file mode 100644
index 0000000..8926ed8
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -0,0 +1,172 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezCluster extends MiniYARNCluster {
+
+ public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+ private static final Log LOG = LogFactory.getLog(MiniTezCluster.class);
+
+ private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+ private Path confFilePath;
+
+ public MiniTezCluster(String testName) {
+ this(testName, 1);
+ }
+
+ public MiniTezCluster(String testName, int noOfNMs) {
+ super(testName, noOfNMs, 4, 4);
+ }
+
+ public MiniTezCluster(String testName, int noOfNMs,
+ int numLocalDirs, int numLogDirs) {
+ super(testName, noOfNMs, numLocalDirs, numLogDirs);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+ if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+ "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+ }
+
+ if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+ // nothing defined. set quick delete value
+ conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+ }
+
+ File appJarFile = new File(MiniTezCluster.APPJAR);
+
+ if (!appJarFile.exists()) {
+ String message = "TezAppJar " + MiniTezCluster.APPJAR
+ + " not found. Exiting.";
+ LOG.info(message);
+ throw new TezUncheckedException(message);
+ } else {
+ conf.set(TezConfiguration.TEZ_LIB_URIS, "file://" + appJarFile.getAbsolutePath());
+ LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+ }
+
+ // VMEM monitoring disabled, PMEM monitoring enabled.
+ conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+ conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+ conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+
+ try {
+ Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+ new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+ FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+ if (fc.util().exists(stagingPath)) {
+ LOG.info(stagingPath + " exists! deleting...");
+ fc.delete(stagingPath, true);
+ }
+ LOG.info("mkdir: " + stagingPath);
+ fc.mkdir(stagingPath, null, true);
+
+ //mkdir done directory as well
+ String doneDir =
+ JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ Path doneDirPath = fc.makeQualified(new Path(doneDir));
+ fc.mkdir(doneDirPath, null, true);
+ } catch (IOException e) {
+ throw new TezUncheckedException("Could not create staging directory. ", e);
+ }
+ conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+ //configure the shuffle service in NM
+ conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+ new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+ conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+ ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+ Service.class);
+
+ // Non-standard shuffle port
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+ conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+ DefaultContainerExecutor.class, ContainerExecutor.class);
+
+ // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+ // for corresponding uberized tests.
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ super.serviceStart();
+ File workDir = super.getTestWorkDir();
+ Configuration conf = super.getConfig();
+
+ confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+ File confFile = new File(confFilePath.toString());
+ try {
+ confFile.createNewFile();
+ conf.writeXml(new FileOutputStream(confFile));
+ confFile.deleteOnExit();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ confFilePath = new Path(confFile.getAbsolutePath());
+ conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+ LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+ + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+ }
+
+ public Path getConfigFilePath() {
+ return confFilePath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/findbugs-exclude.xml b/tez-yarn-client/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-yarn-client/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>