You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/04/12 20:25:26 UTC
[hive] branch master updated: HIVE-23180 : Remove unused variables
from tez build dag (Mustafa Iman via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9ae2f HIVE-23180 : Remove unused variables from tez build dag (Mustafa Iman via Ashutosh Chauhan)
7d9ae2f is described below
commit 7d9ae2f2c7879663618de9dffb482f731742b36e
Author: Mustafa Iman <mu...@gmail.com>
AuthorDate: Sun Apr 12 13:24:38 2020 -0700
HIVE-23180 : Remove unused variables from tez build dag (Mustafa Iman via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 81 ++++++++++------------
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 62 ++++++++---------
.../hive/ql/udf/generic/GenericUDTFGetSplits.java | 5 +-
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 4 +-
4 files changed, 67 insertions(+), 85 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 9b2dae3..6f5830d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -153,7 +153,6 @@ import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.input.MultiMRInput;
-import org.apache.hadoop.hive.ql.exec.tez.NullMROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
@@ -667,15 +666,14 @@ public class DagUtils {
}
}
- private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs,
- Path mrScratchDir, Context ctx, VertexType vertexType,
- Map<String, LocalResource> localResources) throws Exception {
+ private Vertex createVertexFromMergeWork(JobConf conf, MergeJoinWork mergeJoinWork,
+ Path mrScratchDir, VertexType vertexType) throws Exception {
Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
if (mergeJoinWork.getMainWork() instanceof MapWork) {
List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
- Vertex mergeVx = createVertex(
- conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources);
+ Vertex mergeVx = createVertexFromMapWork(
+ conf, mapWork, mrScratchDir, vertexType);
conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
// mapreduce.tez.input.initializer.serialize.event.payload should be set
@@ -718,17 +716,16 @@ public class DagUtils {
mergeVx.setVertexManagerPlugin(desc);
return mergeVx;
} else {
- return createVertex(conf,
- (ReduceWork) mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources);
+ return createVertexFromReduceWork(conf,
+ (ReduceWork) mergeJoinWork.getMainWork(), mrScratchDir);
}
}
/*
* Helper function to create Vertex from MapWork.
*/
- private Vertex createVertex(JobConf conf, MapWork mapWork,
- FileSystem fs, Path mrScratchDir, Context ctx, VertexType vertexType,
- Map<String, LocalResource> localResources) throws Exception {
+ private Vertex createVertexFromMapWork(JobConf conf, MapWork mapWork, Path mrScratchDir,
+ VertexType vertexType) throws Exception {
// set up the operator plan
Utilities.cacheMapWork(conf, mapWork, mrScratchDir);
@@ -846,21 +843,16 @@ public class DagUtils {
procClassName = MergeFileTezProcessor.class.getName();
}
- VertexExecutionContext executionContext = createVertexExecutionContext(mapWork);
-
map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
.setUserPayload(serializedConf), numTasks, getContainerResource(conf));
map.setTaskEnvironment(getContainerEnvironment(conf, true));
- map.setExecutionContext(executionContext);
- map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
// Add the actual source input
String alias = mapWork.getAliasToWork().keySet().iterator().next();
map.addDataSource(alias, dataSource);
- map.addTaskLocalFiles(localResources);
return map;
}
@@ -899,8 +891,7 @@ public class DagUtils {
/*
* Helper function to create Vertex for given ReduceWork.
*/
- private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs,
- Path mrScratchDir, Context ctx, Map<String, LocalResource> localResources)
+ private Vertex createVertexFromReduceWork(JobConf conf, ReduceWork reduceWork, Path mrScratchDir)
throws Exception {
// set up operator plan
@@ -910,8 +901,6 @@ public class DagUtils {
// create the directories FileSinkOperators need
Utilities.createTmpDirs(conf, reduceWork);
- VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(reduceWork);
-
// create the vertex
Vertex reducer = Vertex.create(reduceWork.getName(),
ProcessorDescriptor.create(ReduceTezProcessor.class.getName()).
@@ -921,9 +910,6 @@ public class DagUtils {
reduceWork.getNumReduceTasks(), getContainerResource(conf));
reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
- reducer.setExecutionContext(vertexExecutionContext);
- reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
- reducer.addTaskLocalFiles(localResources);
return reducer;
}
@@ -1444,40 +1430,38 @@ public class DagUtils {
* Create a vertex from a given work object.
*
* @param conf JobConf to be used to this execution unit
- * @param work The instance of BaseWork representing the actual work to be performed
+ * @param workUnit The instance of BaseWork representing the actual work to be performed
* by this vertex.
* @param scratchDir HDFS scratch dir for this execution unit.
- * @param fileSystem FS corresponding to scratchDir and LocalResources
- * @param ctx This query's context
* @return Vertex
*/
@SuppressWarnings("deprecation")
- public Vertex createVertex(JobConf conf, BaseWork work,
- Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren,
- TezWork tezWork, VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
+ public Vertex createVertex(JobConf conf, BaseWork workUnit, Path scratchDir,
+ TezWork tezWork, Map<String, LocalResource> localResources) throws Exception {
- Vertex v = null;
+ Vertex vertex;
// simply dispatch the call to the right method for the actual (sub-) type of
// BaseWork.
- if (work instanceof MapWork) {
- v = createVertex(
- conf, (MapWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
- } else if (work instanceof ReduceWork) {
- v = createVertex(conf, (ReduceWork) work, fileSystem, scratchDir, ctx, localResources);
- } else if (work instanceof MergeJoinWork) {
- v = createVertex(
- conf, (MergeJoinWork) work, fileSystem, scratchDir, ctx, vertexType, localResources);
+ VertexType vertexType = tezWork.getVertexType(workUnit);
+ if (workUnit instanceof MapWork) {
+ vertex = createVertexFromMapWork(
+ conf, (MapWork) workUnit, scratchDir, vertexType);
+ } else if (workUnit instanceof ReduceWork) {
+ vertex = createVertexFromReduceWork(conf, (ReduceWork) workUnit, scratchDir);
+ } else if (workUnit instanceof MergeJoinWork) {
+ vertex = createVertexFromMergeWork(
+ conf, (MergeJoinWork) workUnit, scratchDir, vertexType);
// set VertexManagerPlugin if whether it's a cross product destination vertex
List<String> crossProductSources = new ArrayList<>();
- for (BaseWork parentWork : tezWork.getParents(work)) {
- if (tezWork.getEdgeType(parentWork, work) == EdgeType.XPROD_EDGE) {
+ for (BaseWork parentWork : tezWork.getParents(workUnit)) {
+ if (tezWork.getEdgeType(parentWork, workUnit) == EdgeType.XPROD_EDGE) {
crossProductSources.add(parentWork.getName());
}
}
if (!crossProductSources.isEmpty()) {
CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
- v.setVertexManagerPlugin(
+ vertex.setVertexManagerPlugin(
VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName())
.setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf))));
// parallelism shouldn't be set for cartesian product vertex
@@ -1486,14 +1470,18 @@ public class DagUtils {
// something is seriously wrong if this is happening
throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
}
+ VertexExecutionContext vertexExecutionContext = createVertexExecutionContext(workUnit);
+ vertex.addTaskLocalFiles(localResources);
+ vertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+ vertex.setExecutionContext(vertexExecutionContext);
// initialize stats publisher if necessary
- if (work.isGatheringStats()) {
+ if (workUnit.isGatheringStats()) {
StatsPublisher statsPublisher;
StatsFactory factory = StatsFactory.newFactory(conf);
if (factory != null) {
StatsCollectionContext sCntxt = new StatsCollectionContext(conf);
- sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, conf));
+ sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(workUnit, conf));
statsPublisher = factory.getStatsPublisher();
if (!statsPublisher.init(sCntxt)) { // creating stats table if not exists
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
@@ -1512,13 +1500,14 @@ public class DagUtils {
outputKlass = MROutput.class;
}
// final vertices need to have at least one output
- if (!hasChildren) {
- v.addDataSink("out_"+work.getName(), new DataSinkDescriptor(
+ boolean endVertex = tezWork.getLeaves().contains(workUnit);
+ if (endVertex) {
+ vertex.addDataSink("out_"+workUnit.getName(), new DataSinkDescriptor(
OutputDescriptor.create(outputKlass.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(conf)), null, null));
}
- return v;
+ return vertex;
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 3599d19..854bc89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -34,7 +34,6 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -404,17 +403,15 @@ public class TezTask extends Task<TezWork> {
}
}
- DAG build(JobConf conf, TezWork work, Path scratchDir, Context ctx,
+ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx,
Map<String, LocalResource> vertexResources) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
// getAllWork returns a topologically sorted list, which we use to make
// sure that vertices are created before they are used in edges.
- List<BaseWork> ws = work.getAllWork();
- Collections.reverse(ws);
-
- FileSystem fs = scratchDir.getFileSystem(conf);
+ List<BaseWork> topologicalWorkList = tezWork.getAllWork();
+ Collections.reverse(topologicalWorkList);
// the name of the dag is what is displayed in the AM/Job UI
String dagName = utils.createDagName(conf, queryPlan);
@@ -435,13 +432,12 @@ public class TezTask extends Task<TezWork> {
dag.setCredentials(conf.getCredentials());
setAccessControlsForCurrentUser(dag, queryPlan.getQueryId(), conf);
- for (BaseWork w: ws) {
- boolean isFinal = work.getLeaves().contains(w);
+ for (BaseWork workUnit: topologicalWorkList) {
// translate work to vertex
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + workUnit.getName());
- if (w instanceof UnionWork) {
+ if (workUnit instanceof UnionWork) {
// Special case for unions. These items translate to VertexGroups
List<BaseWork> unionWorkItems = new LinkedList<BaseWork>();
@@ -449,8 +445,8 @@ public class TezTask extends Task<TezWork> {
// split the children into vertices that make up the union and vertices that are
// proper children of the union
- for (BaseWork v: work.getChildren(w)) {
- EdgeType type = work.getEdgeProperty(w, v).getEdgeType();
+ for (BaseWork v: tezWork.getChildren(workUnit)) {
+ EdgeType type = tezWork.getEdgeProperty(workUnit, v).getEdgeType();
if (type == EdgeType.CONTAINS) {
unionWorkItems.add(v);
} else {
@@ -458,7 +454,7 @@ public class TezTask extends Task<TezWork> {
}
}
JobConf parentConf = workToConf.get(unionWorkItems.get(0));
- checkOutputSpec(w, parentConf);
+ checkOutputSpec(workUnit, parentConf);
// create VertexGroup
Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
@@ -467,7 +463,7 @@ public class TezTask extends Task<TezWork> {
for (BaseWork v: unionWorkItems) {
vertexArray[i++] = workToVertex.get(v);
}
- VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
+ VertexGroup group = dag.createVertexGroup(workUnit.getName(), vertexArray);
// For a vertex group, all Outputs use the same Key-class, Val-class and partitioner.
// Pick any one source vertex to figure out the Edge configuration.
@@ -476,47 +472,47 @@ public class TezTask extends Task<TezWork> {
for (BaseWork v: children) {
// finally we can create the grouped edge
GroupInputEdge e = utils.createEdge(group, parentConf,
- workToVertex.get(v), work.getEdgeProperty(w, v), v, work);
+ workToVertex.get(v), tezWork.getEdgeProperty(workUnit, v), v, tezWork);
dag.addEdge(e);
}
} else {
// Regular vertices
- JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
- checkOutputSpec(w, wxConf);
- Vertex wx = utils.createVertex(wxConf, w, scratchDir, fs, ctx, !isFinal,
- work, work.getVertexType(w), vertexResources);
- if (work.getChildren(w).size() > 1) {
- String value = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
+ JobConf wxConf = utils.initializeVertexConf(conf, ctx, workUnit);
+ checkOutputSpec(workUnit, wxConf);
+ Vertex wx = utils.createVertex(wxConf, workUnit, scratchDir,
+ tezWork, vertexResources);
+ if (tezWork.getChildren(workUnit).size() > 1) {
+ String tezRuntimeSortMb = wxConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB);
int originalValue = 0;
- if(value == null) {
+ if(tezRuntimeSortMb == null) {
originalValue = TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB_DEFAULT;
} else {
- originalValue = Integer.valueOf(value);
+ originalValue = Integer.valueOf(tezRuntimeSortMb);
}
- int newValue = (int) (originalValue / work.getChildren(w).size());
+ int newValue = originalValue / tezWork.getChildren(workUnit).size();
wxConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, Integer.toString(newValue));
LOG.info("Modified " + TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB + " to " + newValue);
}
- if (w.getReservedMemoryMB() > 0) {
+ if (workUnit.getReservedMemoryMB() > 0) {
// If reversedMemoryMB is set, make memory allocation fraction adjustment as needed
- double frac = DagUtils.adjustMemoryReserveFraction(w.getReservedMemoryMB(), super.conf);
+ double frac = DagUtils.adjustMemoryReserveFraction(workUnit.getReservedMemoryMB(), super.conf);
LOG.info("Setting " + TEZ_MEMORY_RESERVE_FRACTION + " to " + frac);
wx.setConf(TEZ_MEMORY_RESERVE_FRACTION, Double.toString(frac));
} // Otherwise just leave it up to Tez to decide how much memory to allocate
dag.addVertex(wx);
- utils.addCredentials(w, dag);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
- workToVertex.put(w, wx);
- workToConf.put(w, wxConf);
+ utils.addCredentials(workUnit, dag);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + workUnit.getName());
+ workToVertex.put(workUnit, wx);
+ workToConf.put(workUnit, wxConf);
// add all dependencies (i.e.: edges) to the graph
- for (BaseWork v: work.getChildren(w)) {
+ for (BaseWork v: tezWork.getChildren(workUnit)) {
assert workToVertex.containsKey(v);
Edge e = null;
- TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
- e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, work);
+ TezEdgeProperty edgeProp = tezWork.getEdgeProperty(workUnit, v);
+ e = utils.createEdge(wxConf, wx, workToVertex.get(v), edgeProp, v, tezWork);
dag.addEdge(e);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 389f5cc..00a6c89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -440,7 +440,6 @@ public class GenericUDTFGetSplits extends GenericUDTF {
JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
// TODO: should we also whitelist input formats here? from mapred.input.format.class
Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
- FileSystem fs = scratchDir.getFileSystem(job);
try {
LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(ctx.getConf()), utils, job);
@@ -453,8 +452,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
// Update the queryId to use the generated applicationId. See comment below about
// why this is done.
HiveConf.setVar(wxConf, HiveConf.ConfVars.HIVEQUERYID, applicationId.toString());
- Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, fs, ctx, false, work,
- work.getVertexType(mapWork), DagUtils.createTezLrMap(appJarLr, null));
+ Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, work,
+ DagUtils.createTezLrMap(appJarLr, null));
String vertexName = wx.getName();
dag.addVertex(wx);
utils.addCredentials(mapWork, dag);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index c14dc62..6f52d65 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.mapred.JobConf;
@@ -95,8 +94,7 @@ public class TestTezTask {
when(utils.getTezDir(any(Path.class))).thenReturn(path);
when(
utils.createVertex(any(JobConf.class), any(BaseWork.class), any(Path.class),
- any(FileSystem.class), any(Context.class),
- anyBoolean(), any(TezWork.class), any(VertexType.class), any(Map.class))).thenAnswer(
+ any(TezWork.class), any(Map.class))).thenAnswer(
new Answer<Vertex>() {
@Override