You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/08 03:57:20 UTC
[27/31] hive git commit: HIVE-10940: HiveInputFormat::pushFilters
serializes PPD objects for each getRecordReader call (Gunther Hagleitner
reviewed by Gopal V, Sergey Shelukhin)
HIVE-10940: HiveInputFormat::pushFilters serializes PPD objects for each getRecordReader call (Gunther Hagleitner reviewed by Gopal V, Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/afeed299
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/afeed299
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/afeed299
Branch: refs/heads/llap
Commit: afeed299451a651e5ac957b01999dae564f096a4
Parents: f790591
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Tue Jul 7 14:00:12 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Tue Jul 7 14:00:12 2015 -0700
----------------------------------------------------------------------
.../ql/optimizer/physical/SerializeFilter.java | 178 +++++++++++++++++++
.../hadoop/hive/ql/parse/TezCompiler.java | 8 +
.../hadoop/hive/ql/plan/TableScanDesc.java | 15 +-
3 files changed, 194 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/afeed299/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
new file mode 100644
index 0000000..f3c1d42
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
@@ -0,0 +1,178 @@
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Stack;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+
+/**
+ * SerializeFilter is a simple physical optimizer that serializes all filter expressions in
+ * Tablescan Operators.
+ */
+public class SerializeFilter implements PhysicalPlanResolver {
+
+ protected static transient final Log LOG = LogFactory.getLog(SerializeFilter.class);
+
+ public class Serializer implements Dispatcher {
+
+ private final PhysicalContext pctx;
+
+ public Serializer(PhysicalContext pctx) {
+ this.pctx = pctx;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+ throws SemanticException {
+ Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+ if (currTask instanceof StatsTask) {
+ currTask = ((StatsTask) currTask).getWork().getSourceTask();
+ }
+ if (currTask instanceof TezTask) {
+ TezWork work = ((TezTask) currTask).getWork();
+ for (BaseWork w : work.getAllWork()) {
+ evaluateWork(w);
+ }
+ }
+ return null;
+ }
+
+ private void evaluateWork(BaseWork w) throws SemanticException {
+
+ if (w instanceof MapWork) {
+ evaluateMapWork((MapWork) w);
+ } else if (w instanceof ReduceWork) {
+ evaluateReduceWork((ReduceWork) w);
+ } else if (w instanceof MergeJoinWork) {
+ evaluateMergeWork((MergeJoinWork) w);
+ } else {
+ LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName());
+ }
+ }
+
+ private void evaluateMergeWork(MergeJoinWork w) throws SemanticException {
+ for (BaseWork baseWork : w.getBaseWorkList()) {
+ evaluateOperators(baseWork, pctx);
+ }
+ }
+
+ private void evaluateReduceWork(ReduceWork w) throws SemanticException {
+ evaluateOperators(w, pctx);
+ }
+
+ private void evaluateMapWork(MapWork w) throws SemanticException {
+ evaluateOperators(w, pctx);
+ }
+
+ private void evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException {
+
+ Dispatcher disp = null;
+ final Set<TableScanOperator> tableScans = new LinkedHashSet<TableScanOperator>();
+
+ Map<Rule, NodeProcessor> rules = new HashMap<Rule, NodeProcessor>();
+ rules.put(new RuleRegExp("TS finder",
+ TableScanOperator.getOperatorName() + "%"), new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) {
+ tableScans.add((TableScanOperator) nd);
+ return null;
+ }
+ });
+ disp = new DefaultRuleDispatcher(null, rules, null);
+
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(w.getAllRootOperators());
+
+ LinkedHashMap<Node, Object> nodeOutput = new LinkedHashMap<Node, Object>();
+ ogw.startWalking(topNodes, nodeOutput);
+
+ for (TableScanOperator ts: tableScans) {
+ if (ts.getConf() != null && ts.getConf().getFilterExpr() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Serializing: " + ts.getConf().getFilterExpr().getExprString());
+ }
+ ts.getConf().setSerializedFilterExpr(
+ Utilities.serializeExpression(ts.getConf().getFilterExpr()));
+ }
+
+ if (ts.getConf() != null && ts.getConf().getFilterObject() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Serializing: " + ts.getConf().getFilterObject());
+ }
+
+ ts.getConf().setSerializedFilterObject(
+ Utilities.serializeObject(ts.getConf().getFilterObject()));
+ }
+ }
+ }
+
+ public class DefaultRule implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+
+ pctx.getConf();
+
+ // create dispatcher and graph walker
+ Dispatcher disp = new Serializer(pctx);
+ TaskGraphWalker ogw = new TaskGraphWalker(disp);
+
+ // get all the tasks nodes from root task
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getRootTasks());
+
+ // begin to walk through the task tree.
+ ogw.startWalking(topNodes, null);
+ return pctx;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/afeed299/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 8ab7cd4..f20393a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider;
import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
@@ -481,6 +482,13 @@ public class TezCompiler extends TaskCompiler {
&& (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) {
physicalCtx = new MemoryDecider().resolve(physicalCtx);
}
+
+ // This optimizer will serialize all filters that made it to the
+ // table scan operator to avoid having to do it multiple times on
+ // the backend. If you have a physical optimization that changes
+ // table scans or filters, you have to invoke it before this one.
+ physicalCtx = new SerializeFilter().resolve(physicalCtx);
+
return;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/afeed299/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 9e9a2a2..6b6ed53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -147,11 +147,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
}
public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) {
- // TODO: we could avoid serialization if it's the same expr. Check?
this.filterExpr = filterExpr;
- if (filterExpr != null) {
- serializedFilterExpr = Utilities.serializeExpression(filterExpr);
- }
}
public Serializable getFilterObject() {
@@ -160,9 +156,6 @@ public class TableScanDesc extends AbstractOperatorDesc {
public void setFilterObject(Serializable filterObject) {
this.filterObject = filterObject;
- if (filterObject != null) {
- serializedFilterObject = Utilities.serializeObject(filterObject);
- }
}
public void setNeededColumnIDs(List<Integer> neededColumnIDs) {
@@ -296,7 +289,15 @@ public class TableScanDesc extends AbstractOperatorDesc {
return serializedFilterExpr;
}
+ public void setSerializedFilterExpr(String serializedFilterExpr) {
+ this.serializedFilterExpr = serializedFilterExpr;
+ }
+
public String getSerializedFilterObject() {
return serializedFilterObject;
}
+
+ public void setSerializedFilterObject(String serializedFilterObject) {
+ this.serializedFilterObject = serializedFilterObject;
+ }
}