You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/06 09:42:27 UTC
svn commit: r1510877 - in
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
DagUtils.java TezTask.java
Author: gunther
Date: Tue Aug 6 07:42:27 2013
New Revision: 1510877
URL: http://svn.apache.org/r1510877
Log:
HIVE-4918: Tez job submission (Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1510877&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Aug 6 07:42:27 2013
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+/**
+ * DagUtils. DagUtils is a collection of helper methods to convert
+ * map and reduce work to tez vertices and edges. It handles configuration
+ * objects, file localization and vertex/edge creation.
+ */
+public class DagUtils {
+
+ /*
+ * Creates the configuration object necessary to run a specific vertex from
+ * map work. This includes input formats, input processor, etc.
+= */
+ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
+ JobConf conf = new JobConf(baseConf);
+
+ if (mapWork.getNumMapTasks() != null) {
+ conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
+ }
+
+ if (mapWork.getMaxSplitSize() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE,
+ mapWork.getMaxSplitSize().longValue());
+ }
+
+ if (mapWork.getMinSplitSize() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE,
+ mapWork.getMinSplitSize().longValue());
+ }
+
+ if (mapWork.getMinSplitSizePerNode() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE,
+ mapWork.getMinSplitSizePerNode().longValue());
+ }
+
+ if (mapWork.getMinSplitSizePerRack() != null) {
+ HiveConf.setLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK,
+ mapWork.getMinSplitSizePerRack().longValue());
+ }
+
+ Utilities.setInputAttributes(conf, mapWork);
+
+ String inpFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ if ((inpFormat == null) || (!StringUtils.isNotBlank(inpFormat))) {
+ inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+ }
+
+ if (mapWork.isUseBucketizedHiveInputFormat()) {
+ inpFormat = BucketizedHiveInputFormat.class.getName();
+ }
+
+ conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName());
+ conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, inpFormat);
+
+ return conf;
+ }
+
+ /**
+ * Given two vertices and their respective configuration objects createEdge
+ * will create an Edge object that connects the two. Currently the edge will
+ * always be a stable bi-partite edge.
+ *
+ * @param vConf JobConf of the first vertex
+ * @param v The first vertex (source)
+ * @param wConf JobConf of the second vertex
+ * @param w The second vertex (sink)
+ * @return
+ */
+ public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) {
+
+ // Tez needs to setup output subsequent input pairs correctly
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
+
+ // all edges are of the same type right now
+ EdgeProperty edgeProperty =
+ new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+ new OutputDescriptor(OnFileSortedOutput.class.getName(), null),
+ new InputDescriptor(ShuffledMergedInput.class.getName(), null));
+ return new Edge(v, w, edgeProperty);
+ }
+
+ /*
+ * Helper function to create Vertex from MapWork.
+ */
+ private static Vertex createVertex(JobConf conf, MapWork mapWork, int seqNo,
+ LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
+ Path mrScratchDir, Context ctx) throws Exception {
+
+ // map work can contain localwork, i.e: hashtables for map-side joins
+ Path hashTableArchive = createHashTables(mapWork, conf);
+ LocalResource localWorkLr = null;
+ if (hashTableArchive != null) {
+ localWorkLr = createLocalResource(fs,
+ hashTableArchive, LocalResourceType.ARCHIVE,
+ LocalResourceVisibility.APPLICATION);
+ }
+
+ // write out the operator plan
+ Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false);
+ LocalResource planLr = createLocalResource(fs,
+ planPath, LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION);
+
+ // setup input paths and split info
+ List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(), ctx);
+ Utilities.setInputPaths(conf, inputPaths);
+
+ InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, mrScratchDir);
+ MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, conf);
+
+ // finally create the vertex
+ Vertex map = null;
+ if (inputSplitInfo.getNumTasks() != 0) {
+ map = new Vertex("Map "+seqNo,
+ new ProcessorDescriptor(MapProcessor.class.getName(),
+ MRHelpers.createUserPayloadFromConf(conf)),
+ inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf));
+ Map<String, String> environment = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
+ map.setTaskEnvironment(environment);
+ map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+
+ map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ if (localWorkLr != null) {
+ localResources.put(hashTableArchive.getName(), localWorkLr);
+ }
+ localResources.put(appJarLr.getResource().getFile(), appJarLr);
+ for (LocalResource lr: additionalLr) {
+ localResources.put(lr.getResource().getFile(), lr);
+ }
+ localResources.put(planPath.getName(), planLr);
+
+ MRHelpers.updateLocalResourcesForInputSplits(FileSystem.get(conf), inputSplitInfo,
+ localResources);
+ map.setTaskLocalResources(localResources);
+ }
+ return map;
+ }
+
+ /*
+ * If the given MapWork has local work embedded we need to generate the corresponding
+ * hash tables and localize them. These tables will be used by the map work to do
+ * map-side joins.
+ */
+ private static Path createHashTables(MapWork mapWork, Configuration conf) {
+ return null;
+ }
+
+ /*
+ * Helper function to create JobConf for specific ReduceWork.
+ */
+ private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) {
+ JobConf conf = new JobConf(baseConf);
+
+ conf.set(MRJobConfig.REDUCE_CLASS_ATTR, ExecReducer.class.getName());
+
+ boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
+ HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
+ useSpeculativeExecReducers);
+
+ // reducers should have been set at planning stage
+ // job.setNumberOfReducers(rWork.getNumberOfReducers())
+ conf.set(MRJobConfig.NUM_REDUCES, reduceWork.getNumReduceTasks().toString());
+
+ return conf;
+ }
+
+ /*
+ * Helper function to create Vertex for given ReduceWork.
+ */
+ private static Vertex creatVertex(JobConf conf, ReduceWork reduceWork, int seqNo,
+ LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
+ Path mrScratchDir, Context ctx) throws IOException {
+
+ // write out the operator plan
+ Path planPath = Utilities.setReduceWork(conf, reduceWork,
+ mrScratchDir.getName(), false);
+ LocalResource planLr = createLocalResource(fs, planPath,
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION);
+
+ // create the vertex
+ Vertex reducer = new Vertex("Reducer "+seqNo,
+ new ProcessorDescriptor(ReduceProcessor.class.getName(),
+ MRHelpers.createUserPayloadFromConf(conf)),
+ reduceWork.getNumReduceTasks(), MRHelpers.getReduceResource(conf));
+
+ Map<String, String> environment = new HashMap<String, String>();
+
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
+ reducer.setTaskEnvironment(environment);
+
+ reducer.setJavaOpts(MRHelpers.getReduceJavaOpts(conf));
+
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ localResources.put(appJarLr.getResource().getFile(), appJarLr);
+ for (LocalResource lr: additionalLr) {
+ localResources.put(lr.getResource().getFile(), lr);
+ }
+ localResources.put(planPath.getName(), planLr);
+ reducer.setTaskLocalResources(localResources);
+
+ return reducer;
+ }
+
+ /*
+ * Helper method to create a yarn local resource.
+ */
+ private static LocalResource createLocalResource(FileSystem remoteFs, Path file,
+ LocalResourceType type, LocalResourceVisibility visibility) {
+
+ FileStatus fstat = null;
+ try {
+ fstat = remoteFs.getFileStatus(file);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ URL resourceURL = ConverterUtils.getYarnUrlFromPath(file);
+ long resourceSize = fstat.getLen();
+ long resourceModificationTime = fstat.getModificationTime();
+
+ LocalResource lr = Records.newRecord(LocalResource.class);
+ lr.setResource(resourceURL);
+ lr.setType(type);
+ lr.setSize(resourceSize);
+ lr.setVisibility(visibility);
+ lr.setTimestamp(resourceModificationTime);
+
+ return lr;
+ }
+
+ /**
+ * Localizes files, archives and jars the user has instructed us
+ * to provide on the cluster as resources for execution.
+ *
+ * @param conf
+ * @return List<LocalResource> local resources to add to execution
+ */
+ public static List<LocalResource> localizeTempFiles(Configuration conf) {
+ List<LocalResource> tmpResources = new ArrayList<LocalResource>();
+
+ String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
+ String addedJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEADDEDJARS);
+ String addedFiles = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEADDEDFILES);
+ String addedArchives = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES);
+
+ // need to localize the additional jars and files
+
+ return tmpResources;
+ }
+
+ /**
+ * Creates a local resource representing the hive-exec jar. This resource will
+ * be used to execute the plan on the cluster.
+ */
+ public static LocalResource createHiveExecLocalResource(Path mrScratchDir) {
+ return null;
+ }
+
+ /**
+ * Creates and initializes a JobConf object that can be used to execute
+ * the DAG. The configuration object will contain configurations from mapred-site
+ * overlaid with key/value pairs from the hiveConf object. Finally it will also
+ * contain some hive specific configurations that do not change from DAG to DAG.
+ *
+ * @param hiveConf Current hiveConf for the execution
+ * @return JobConf base configuration for job execution
+ * @throws IOException
+ */
+ public static JobConf createConfiguration(HiveConf hiveConf) throws IOException {
+ hiveConf.setBoolean("mapred.mapper.new-api", false);
+
+ JobConf conf = (JobConf) MRHelpers.getBaseMRConfiguration();
+ MRHelpers.doJobClientMagic(conf);
+
+ for (Map.Entry<String, String> entry: hiveConf) {
+ if (conf.get(entry.getKey()) == null) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ conf.set("mapreduce.framework.name","yarn-tez");
+ conf.set("mapreduce.job.output.committer.class", NullOutputCommitter.class.getName());
+
+ conf.setBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, false);
+ conf.setBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, false);
+
+ conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, HiveOutputFormatImpl.class, OutputFormat.class);
+
+ conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName());
+
+ conf.set(MRJobConfig.OUTPUT_KEY_CLASS, HiveKey.class.getName());
+ conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName());
+
+ conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
+
+ return conf;
+ }
+
+ /**
+ * Creates and initializes the JobConf object for a given BaseWork object.
+ *
+ * @param conf Any configurations in conf will be copied to the resulting new JobConf object.
+ * @param work BaseWork will be used to populate the configuration object.
+ * @return JobConf new configuration object
+ */
+ public static JobConf initializeVertexConf(JobConf conf, BaseWork work) {
+
+ // simply dispatch the call to the right method for the actual (sub-) type of
+ // BaseWork.
+ if (work instanceof MapWork) {
+ return initializeVertexConf(conf, (MapWork)work);
+ } else if (work instanceof ReduceWork) {
+ return initializeVertexConf(conf, (ReduceWork)work);
+ } else {
+ assert false;
+ return null;
+ }
+ }
+
+ /**
+ * Create a vertex from a given work object.
+ *
+ * @param conf JobConf to be used to this execution unit
+ * @param work The instance of BaseWork representing the actual work to be performed
+ * by this vertex.
+ * @param scratchDir HDFS scratch dir for this execution unit.
+ * @param seqNo Unique number for this DAG. Used to name the vertex.
+ * @param appJarLr Local resource for hive-exec.
+ * @param additionalLr
+ * @param fileSystem FS corresponding to scratchDir and LocalResources
+ * @param ctx This query's context
+ * @return Vertex
+ */
+ public static Vertex createVertex(JobConf conf, BaseWork work,
+ Path scratchDir, int seqNo, LocalResource appJarLr, List<LocalResource> additionalLr,
+ FileSystem fileSystem, Context ctx) {
+
+ // simply dispatch the call to the right method for the actual (sub-) type of
+ // BaseWork.
+ if (work instanceof MapWork) {
+ return createVertex(conf, (MapWork) work, scratchDir, seqNo, appJarLr,
+ additionalLr, fileSystem, ctx);
+ } else if (work instanceof ReduceWork) {
+ return createVertex(conf, (ReduceWork) work, scratchDir, seqNo, appJarLr,
+ additionalLr, fileSystem, ctx);
+ } else {
+ assert false;
+ return null;
+ }
+ }
+
+ private DagUtils() {
+ // don't instantiate
+ }
+}
Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1510877&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Aug 6 07:42:27 2013
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+
+/**
+ *
+ * TezTask handles the execution of TezWork. Currently it executes a graph of map and reduce work
+ * using the Tez APIs directly.
+ *
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public class TezTask extends Task<TezWork> {
+
+ public TezTask() {
+ super();
+ }
+
+ @Override
+ public int execute(DriverContext driverContext) {
+ int rc = 1;
+
+ Context ctx = driverContext.getCtx();
+
+ try {
+
+ // we will localize all the files (jars, plans, hashtables) to the
+ // scratch dir. let's create this first.
+ Path scratchDir = new Path(ctx.getMRScratchDir());
+
+ // jobConf will hold all the configuration for hadoop, tez, and hive
+ JobConf jobConf = DagUtils.createConfiguration(conf);
+
+ // unless already installed on all the cluster nodes, we'll have to
+ // localize hive-exec.jar as well.
+ LocalResource appJarLr = DagUtils.createHiveExecLocalResource(scratchDir);
+
+ // next we translate the TezWork to a Tez DAG
+ DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
+
+ // submit will send the job to the cluster and start executing
+ DAGClient client = submit(jobConf, dag, scratchDir, appJarLr);
+
+ // finally monitor will print progress until the job is done
+ TezJobMonitor monitor = new TezJobMonitor();
+ rc = monitor.monitorExecution(client);
+
+ } catch (Exception e) {
+ LOG.error("Failed to execute tez graph.", e);
+ } finally {
+ Utilities.clearWork(conf);
+ try {
+ ctx.clear();
+ } catch (Exception e) {
+ /*best effort*/
+ LOG.warn("Failed to clean up after tez job");
+ }
+ }
+ return rc;
+ }
+
+ private DAG build(JobConf conf, TezWork work, Path scratchDir,
+ LocalResource appJarLr, Context ctx)
+ throws IOException {
+
+ Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
+ Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
+
+ // we need to get the user specified local resources for this dag
+ List<LocalResource> additionalLr = DagUtils.localizeTempFiles(conf);
+
+ // getAllWork returns a topologically sorted list, which we use to make
+ // sure that vertices are created before they are used in edges.
+ List<BaseWork> ws = work.getAllWork();
+ Collections.reverse(ws);
+
+ // the name of the dag is what is displayed in the AM/Job UI
+ DAG dag = new DAG(
+ Utilities.abbreviate(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING),
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVEJOBNAMELENGTH)));
+
+ int i = ws.size();
+ for (BaseWork w: ws) {
+
+ // translate work to vertex
+ JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
+ Vertex wx = DagUtils.createVertex(wxConf, w, scratchDir, i--,
+ appJarLr, additionalLr, scratchDir.getFileSystem(conf), ctx);
+ dag.addVertex(wx);
+ workToVertex.put(w, wx);
+ workToConf.put(w, wxConf);
+
+ // add all dependencies (i.e.: edges) to the graph
+ for (BaseWork v: work.getChildren(w)) {
+ assert workToVertex.containsKey(v);
+ Edge e = DagUtils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v));
+ dag.addEdge(e);
+ }
+ }
+
+ return dag;
+ }
+
+ private DAGClient submit(JobConf conf, DAG dag, Path scratchDir, LocalResource appJarLr)
+ throws IOException, TezException, InterruptedException {
+
+ TezClient tezClient = new TezClient(new TezConfiguration(conf));
+
+ // environment variables used by application master
+ Map<String,String> amEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(conf, amEnv, false);
+
+ // setup local resources used by application master
+ Map<String, LocalResource> amLrs = new HashMap<String, LocalResource>();
+ amLrs.put(appJarLr.getResource().getFile(), appJarLr);
+
+ // ready to start execution on the cluster
+ DAGClient dagClient = tezClient.submitDAGApplication(dag, scratchDir,
+ null, "default", Collections.singletonList(""), amEnv, amLrs,
+ new TezConfiguration(conf));
+
+ return dagClient;
+ }
+
+ @Override
+ public boolean isMapRedTask() {
+ return true;
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.MAPRED;
+ }
+
+ @Override
+ public String getName() {
+ return "TEZ";
+ }
+}