You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/01/26 01:59:09 UTC
hive git commit: HIVE-20847: Review of NullScan Code (Beluga Behr,
reviewed by Vineet Garg)
Repository: hive
Updated Branches:
refs/heads/master 6d4b19b55 -> a3bac4d7e
HIVE-20847: Review of NullScan Code (Beluga Behr, reviewed by Vineet Garg)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a3bac4d7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a3bac4d7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a3bac4d7
Branch: refs/heads/master
Commit: a3bac4d7e508268ff9855e16e965e364a3ab46c2
Parents: 6d4b19b
Author: Beluga Behr <da...@gmail.com>
Authored: Fri Jan 25 17:56:52 2019 -0800
Committer: Vineet Garg <vg...@apache.org>
Committed: Fri Jan 25 17:58:59 2019 -0800
----------------------------------------------------------------------
.../optimizer/physical/NullScanOptimizer.java | 83 ++++++++-------
.../physical/NullScanTaskDispatcher.java | 101 ++++++++++---------
2 files changed, 95 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a3bac4d7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java
index bc8afbf..282805d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanOptimizer.java
@@ -18,20 +18,19 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
+import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
import java.util.Stack;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -45,6 +44,8 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer.Walker
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This optimizer attempts following two optimizations:
@@ -54,27 +55,32 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
*/
public class NullScanOptimizer implements PhysicalPlanResolver {
- private static final Logger LOG = LoggerFactory.getLogger(NullScanOptimizer.class.getName());
- @Override
- public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(NullScanOptimizer.class);
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%.*" +
- FilterOperator.getOperatorName() + "%"), new WhereFalseProcessor());
+ @Override
+ public PhysicalContext resolve(PhysicalContext pctx)
+ throws SemanticException {
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<>();
+ opRules.put(
+ new RuleRegExp("R1",
+ TableScanOperator.getOperatorName() + "%.*"
+ + FilterOperator.getOperatorName() + "%"),
+ new WhereFalseProcessor());
Dispatcher disp = new NullScanTaskDispatcher(pctx, opRules);
GraphWalker ogw = new DefaultGraphWalker(disp);
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.getRootTasks());
+ List<Node> topNodes = new ArrayList<>(pctx.getRootTasks());
ogw.startWalking(topNodes, null);
opRules.clear();
- opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName()+ "%"),new TSMarker());
- opRules.put(new RuleRegExp("R2", LimitOperator.getOperatorName()+ "%"), new Limit0Processor());
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+ new TSMarker());
+ opRules.put(new RuleRegExp("R2", LimitOperator.getOperatorName() + "%"),
+ new Limit0Processor());
disp = new NullScanTaskDispatcher(pctx, opRules);
ogw = new DefaultGraphWalker(disp);
- topNodes = new ArrayList<Node>();
- topNodes.addAll(pctx.getRootTasks());
+ topNodes = new ArrayList<>(pctx.getRootTasks());
ogw.startWalking(topNodes, null);
return pctx;
}
@@ -82,30 +88,28 @@ public class NullScanOptimizer implements PhysicalPlanResolver {
//We need to make sure that Null Operator (LIM or FIL) is present in all branches of multi-insert query before
//applying the optimization. This method does full tree traversal starting from TS and will return true only if
//it finds target Null operator on each branch.
- static private boolean isNullOpPresentInAllBranches(TableScanOperator ts, Node causeOfNullNode) {
- Node curNode = null;
- List<? extends Node> curChd = null;
- LinkedList<Node> middleNodes = new LinkedList<Node>();
- middleNodes.addLast(ts);
+ private static boolean isNullOpPresentInAllBranches(TableScanOperator ts, Node causeOfNullNode) {
+ Queue<Node> middleNodes = new ArrayDeque<>();
+ middleNodes.add(ts);
while (!middleNodes.isEmpty()) {
- curNode = middleNodes.remove();
- curChd = curNode.getChildren();
+ Node curNode = middleNodes.remove();
+ List<? extends Node> curChd = curNode.getChildren();
for (Node chd: curChd) {
- if (chd.getChildren() == null || chd.getChildren().isEmpty() || chd == causeOfNullNode) {
- if (chd != causeOfNullNode) { // If there is an end node that not the limit0/wherefalse..
+ List<? extends Node> children = chd.getChildren();
+ if (CollectionUtils.isEmpty(children) || chd == causeOfNullNode) {
+ // If there is an end node that not the limit0/wherefalse..
+ if (chd != causeOfNullNode) {
return false;
}
- }
- else {
- middleNodes.addLast(chd);
+ } else {
+ middleNodes.add(chd);
}
}
-
}
return true;
}
- static private class WhereFalseProcessor implements NodeProcessor {
+ private static class WhereFalseProcessor implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
@@ -126,7 +130,7 @@ public class NullScanOptimizer implements PhysicalPlanResolver {
if (op instanceof TableScanOperator) {
if (isNullOpPresentInAllBranches((TableScanOperator)op, filter)) {
ctx.setMayBeMetadataOnly((TableScanOperator)op);
- LOG.info("Found where false TableScan. " + op);
+ LOG.debug("Found where false TableScan. {}", op);
}
}
}
@@ -135,32 +139,33 @@ public class NullScanOptimizer implements PhysicalPlanResolver {
}
}
- static private class Limit0Processor implements NodeProcessor {
+ private static class Limit0Processor implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
LimitOperator limitOp = (LimitOperator)nd;
- if(!(limitOp.getConf().getLimit() == 0)) {
+ if (!(limitOp.getConf().getLimit() == 0)) {
return null;
}
- HashSet<TableScanOperator> tsOps = ((WalkerCtx)procCtx).getMayBeMetadataOnlyTableScans();
+ Set<TableScanOperator> tsOps =
+ ((WalkerCtx) procCtx).getMayBeMetadataOnlyTableScans();
if (tsOps != null) {
for (Iterator<TableScanOperator> tsOp = tsOps.iterator(); tsOp.hasNext();) {
- if (!isNullOpPresentInAllBranches(tsOp.next(),limitOp))
+ if (!isNullOpPresentInAllBranches(tsOp.next(), limitOp)) {
tsOp.remove();
+ }
}
}
- LOG.info("Found Limit 0 TableScan. " + nd);
+ LOG.debug("Found Limit 0 TableScan. {}", nd);
((WalkerCtx)procCtx).convertMetadataOnly();
return null;
}
-
}
- static private class TSMarker implements NodeProcessor {
+ private static class TSMarker implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
http://git-wip-us.apache.org/repos/asf/hive/blob/a3bac4d7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
index 6c0e71d..ec9813d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java
@@ -19,33 +19,31 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
import java.io.IOException;
-
-import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-
-import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.Stack;
+import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StringInternUtils;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.NullScanFileSystem;
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
+import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -61,39 +59,44 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.NullStructSerDe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Iterate over all tasks one by one and removes all input paths from task if conditions as
- * defined in rules match.
+ * Iterate over all tasks one by one and removes all input paths from task if
+ * conditions as defined in rules match.
*/
public class NullScanTaskDispatcher implements Dispatcher {
- static final Logger LOG = LoggerFactory.getLogger(NullScanTaskDispatcher.class.getName());
+ static final Logger LOG =
+ LoggerFactory.getLogger(NullScanTaskDispatcher.class);
private final PhysicalContext physicalContext;
private final Map<Rule, NodeProcessor> rules;
- public NullScanTaskDispatcher(PhysicalContext context, Map<Rule, NodeProcessor> rules) {
+ public NullScanTaskDispatcher(PhysicalContext context,
+ Map<Rule, NodeProcessor> rules) {
super();
- physicalContext = context;
+ this.physicalContext = context;
this.rules = rules;
}
private String getAliasForTableScanOperator(MapWork work,
TableScanOperator tso) {
-
- for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
- work.getAliasToWork().entrySet()) {
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : work
+ .getAliasToWork().entrySet()) {
if (entry.getValue() == tso) {
return entry.getKey();
}
}
-
return null;
}
- private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc, Path path) {
- if (desc == null) return null;
+ private PartitionDesc changePartitionToMetadataOnly(PartitionDesc desc,
+ Path path) {
+ if (desc == null) {
+ return null;
+ }
boolean isEmpty = false;
try {
isEmpty = Utilities.isEmptyPath(physicalContext.getConf(), path);
@@ -104,25 +107,23 @@ public class NullScanTaskDispatcher implements Dispatcher {
isEmpty ? ZeroRowsInputFormat.class : OneNullRowInputFormat.class);
desc.setOutputFileFormatClass(HiveIgnoreKeyTextOutputFormat.class);
desc.getProperties().setProperty(serdeConstants.SERIALIZATION_LIB,
- NullStructSerDe.class.getName());
+ NullStructSerDe.class.getName());
return desc;
}
- private void processAlias(MapWork work, Path path, ArrayList<String> aliasesAffected,
- ArrayList<String> aliases) {
+ private void processAlias(MapWork work, Path path,
+ Collection<String> aliasesAffected, Set<String> aliases) {
// the aliases that are allowed to map to a null scan.
- ArrayList<String> allowed = new ArrayList<String>();
- for (String alias : aliasesAffected) {
- if (aliases.contains(alias)) {
- allowed.add(alias);
- }
- }
- if (allowed.size() > 0) {
+ Collection<String> allowed = aliasesAffected.stream()
+ .filter(a -> aliases.contains(a)).collect(Collectors.toList());
+ if (!allowed.isEmpty()) {
PartitionDesc partDesc = work.getPathToPartitionInfo().get(path).clone();
- PartitionDesc newPartition = changePartitionToMetadataOnly(partDesc, path);
+ PartitionDesc newPartition =
+ changePartitionToMetadataOnly(partDesc, path);
// Prefix partition with something to avoid it being a hidden file.
- Path fakePath = new Path(NullScanFileSystem.getBase() + newPartition.getTableName()
- + "/part" + encode(newPartition.getPartSpec()));
+ Path fakePath =
+ new Path(NullScanFileSystem.getBase() + newPartition.getTableName()
+ + "/part" + encode(newPartition.getPartSpec()));
StringInternUtils.internUriStringsInPath(fakePath);
work.addPathToPartitionInfo(fakePath, newPartition);
work.addPathToAlias(fakePath, new ArrayList<>(allowed));
@@ -134,12 +135,11 @@ public class NullScanTaskDispatcher implements Dispatcher {
}
}
- private void processAlias(MapWork work, HashSet<TableScanOperator> tableScans) {
- ArrayList<String> aliases = new ArrayList<String>();
+ private void processAlias(MapWork work, Set<TableScanOperator> tableScans) {
+ Set<String> aliases = new HashSet<>();
for (TableScanOperator tso : tableScans) {
// use LinkedHashMap<String, Operator<? extends OperatorDesc>>
- // getAliasToWork()
- // should not apply this for non-native table
+ // getAliasToWork() should not apply this for non-native table
if (tso.getConf().getTableMetadata().getStorageHandler() != null) {
continue;
}
@@ -148,10 +148,10 @@ public class NullScanTaskDispatcher implements Dispatcher {
tso.getConf().setIsMetadataOnly(true);
}
// group path alias according to work
- LinkedHashMap<Path, ArrayList<String>> candidates = new LinkedHashMap<>();
+ Map<Path, ArrayList<String>> candidates = new HashMap<>();
for (Path path : work.getPaths()) {
ArrayList<String> aliasesAffected = work.getPathToAliases().get(path);
- if (aliasesAffected != null && aliasesAffected.size() > 0) {
+ if (CollectionUtils.isNotEmpty(aliasesAffected)) {
candidates.put(path, aliasesAffected);
}
}
@@ -183,10 +183,10 @@ public class NullScanTaskDispatcher implements Dispatcher {
});
for (MapWork mapWork : mapWorks) {
- LOG.debug("Looking at: "+mapWork.getName());
- Collection<Operator<? extends OperatorDesc>> topOperators
- = mapWork.getAliasToWork().values();
- if (topOperators.size() == 0) {
+ LOG.debug("Looking at: {}", mapWork.getName());
+ Collection<Operator<? extends OperatorDesc>> topOperators =
+ mapWork.getAliasToWork().values();
+ if (topOperators.isEmpty()) {
LOG.debug("No top operators");
return null;
}
@@ -199,11 +199,11 @@ public class NullScanTaskDispatcher implements Dispatcher {
GraphWalker ogw = new PreOrderOnceWalker(disp);
// Create a list of topOp nodes
- ArrayList<Node> topNodes = new ArrayList<Node>();
+ ArrayList<Node> topNodes = new ArrayList<>();
// Get the top Nodes for this task
- for (Operator<? extends OperatorDesc>
- workOperator : topOperators) {
- if (parseContext.getTopOps().values().contains(workOperator)) {
+ Collection<TableScanOperator> topOps = parseContext.getTopOps().values();
+ for (Operator<? extends OperatorDesc> workOperator : topOperators) {
+ if (topOps.contains(workOperator)) {
topNodes.add(workOperator);
}
}
@@ -215,10 +215,11 @@ public class NullScanTaskDispatcher implements Dispatcher {
ogw.startWalking(topNodes, null);
- LOG.debug(String.format("Found %d null table scans",
- walkerCtx.getMetadataOnlyTableScans().size()));
- if (walkerCtx.getMetadataOnlyTableScans().size() > 0)
+ int scanTableSize = walkerCtx.getMetadataOnlyTableScans().size();
+ LOG.debug("Found {} null table scans", scanTableSize);
+ if (scanTableSize > 0) {
processAlias(mapWork, walkerCtx.getMetadataOnlyTableScans());
+ }
}
return null;
}