You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2015/07/07 23:00:25 UTC

hive git commit: HIVE-10940: HiveInputFormat::pushFilters serializes PPD objects for each getRecordReader call (Gunther Hagleitner reviewed by Gopal V, Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master f79059159 -> afeed2994


HIVE-10940: HiveInputFormat::pushFilters serializes PPD objects for each getRecordReader call (Gunther Hagleitner reviewed by Gopal V, Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/afeed299
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/afeed299
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/afeed299

Branch: refs/heads/master
Commit: afeed299451a651e5ac957b01999dae564f096a4
Parents: f790591
Author: Prasanth Jayachandran <j....@gmail.com>
Authored: Tue Jul 7 14:00:12 2015 -0700
Committer: Prasanth Jayachandran <j....@gmail.com>
Committed: Tue Jul 7 14:00:12 2015 -0700

----------------------------------------------------------------------
 .../ql/optimizer/physical/SerializeFilter.java  | 178 +++++++++++++++++++
 .../hadoop/hive/ql/parse/TezCompiler.java       |   8 +
 .../hadoop/hive/ql/plan/TableScanDesc.java      |  15 +-
 3 files changed, 194 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/afeed299/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
new file mode 100644
index 0000000..f3c1d42
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SerializeFilter.java
@@ -0,0 +1,178 @@
+package org.apache.hadoop.hive.ql.optimizer.physical;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Stack;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.StatsTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+
+/**
+ * SerializeFilter is a simple physical optimizer that serializes all filter expressions in
+ * Tablescan Operators.
+ */
+public class SerializeFilter implements PhysicalPlanResolver {
+
+  protected static transient final Log LOG = LogFactory.getLog(SerializeFilter.class);
+
+  public class Serializer implements Dispatcher {
+
+    private final PhysicalContext pctx;
+
+    public Serializer(PhysicalContext pctx) {
+      this.pctx = pctx;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+      throws SemanticException {
+      Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
+      if (currTask instanceof StatsTask) {
+        currTask = ((StatsTask) currTask).getWork().getSourceTask();
+      }
+      if (currTask instanceof TezTask) {
+        TezWork work = ((TezTask) currTask).getWork();
+        for (BaseWork w : work.getAllWork()) {
+          evaluateWork(w);
+        }
+      }
+      return null;
+    }
+
+    private void evaluateWork(BaseWork w) throws SemanticException {
+
+      if (w instanceof MapWork) {
+        evaluateMapWork((MapWork) w);
+      } else if (w instanceof ReduceWork) {
+        evaluateReduceWork((ReduceWork) w);
+      } else if (w instanceof MergeJoinWork) {
+        evaluateMergeWork((MergeJoinWork) w);
+      } else {
+        LOG.info("We are not going to evaluate this work type: " + w.getClass().getCanonicalName());
+      }
+    }
+
+    private void evaluateMergeWork(MergeJoinWork w) throws SemanticException {
+      for (BaseWork baseWork : w.getBaseWorkList()) {
+        evaluateOperators(baseWork, pctx);
+      }
+    }
+
+    private void evaluateReduceWork(ReduceWork w) throws SemanticException {
+      evaluateOperators(w, pctx);
+    }
+
+    private void evaluateMapWork(MapWork w) throws SemanticException {
+      evaluateOperators(w, pctx);
+    }
+
+    private void evaluateOperators(BaseWork w, PhysicalContext pctx) throws SemanticException {
+
+      Dispatcher disp = null;
+      final Set<TableScanOperator> tableScans = new LinkedHashSet<TableScanOperator>();
+
+      Map<Rule, NodeProcessor> rules = new HashMap<Rule, NodeProcessor>();
+      rules.put(new RuleRegExp("TS finder",
+              TableScanOperator.getOperatorName() + "%"), new NodeProcessor() {
+          @Override
+          public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+              Object... nodeOutputs) {
+            tableScans.add((TableScanOperator) nd);
+            return null;
+          }
+        });
+      disp = new DefaultRuleDispatcher(null, rules, null);
+
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.addAll(w.getAllRootOperators());
+
+      LinkedHashMap<Node, Object> nodeOutput = new LinkedHashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+
+      for (TableScanOperator ts: tableScans) {
+        if (ts.getConf() != null && ts.getConf().getFilterExpr() != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Serializing: " + ts.getConf().getFilterExpr().getExprString());
+          }
+          ts.getConf().setSerializedFilterExpr(
+            Utilities.serializeExpression(ts.getConf().getFilterExpr()));
+        }
+
+        if (ts.getConf() != null && ts.getConf().getFilterObject() != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Serializing: " + ts.getConf().getFilterObject());
+          }
+
+          ts.getConf().setSerializedFilterObject(
+            Utilities.serializeObject(ts.getConf().getFilterObject()));
+        }
+      }
+    }
+
+    public class DefaultRule implements NodeProcessor {
+
+      @Override
+      public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+          Object... nodeOutputs) throws SemanticException {
+        return null;
+      }
+    }
+  }
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+
+    pctx.getConf();
+
+    // create dispatcher and graph walker
+    Dispatcher disp = new Serializer(pctx);
+    TaskGraphWalker ogw = new TaskGraphWalker(disp);
+
+    // get all the tasks nodes from root task
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getRootTasks());
+
+    // begin to walk through the task tree.
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/afeed299/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index 8ab7cd4..f20393a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.MemoryDecider;
 import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.SerializeFilter;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
@@ -481,6 +482,13 @@ public class TezCompiler extends TaskCompiler {
         && (conf.getBoolVar(HiveConf.ConfVars.HIVEUSEHYBRIDGRACEHASHJOIN))) {
       physicalCtx = new MemoryDecider().resolve(physicalCtx);
     }
+
+    //  This optimizer will serialize all filters that made it to the
+    //  table scan operator to avoid having to do it multiple times on
+    //  the backend. If you have a physical optimization that changes
+    //  table scans or filters, you have to invoke it before this one.
+    physicalCtx = new SerializeFilter().resolve(physicalCtx);
+
     return;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/afeed299/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 9e9a2a2..6b6ed53 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -147,11 +147,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
   }
 
   public void setFilterExpr(ExprNodeGenericFuncDesc filterExpr) {
-    // TODO: we could avoid serialization if it's the same expr. Check?
     this.filterExpr = filterExpr;
-    if (filterExpr != null) {
-      serializedFilterExpr = Utilities.serializeExpression(filterExpr);
-    }
   }
 
   public Serializable getFilterObject() {
@@ -160,9 +156,6 @@ public class TableScanDesc extends AbstractOperatorDesc {
 
   public void setFilterObject(Serializable filterObject) {
     this.filterObject = filterObject;
-    if (filterObject != null) {
-      serializedFilterObject = Utilities.serializeObject(filterObject);
-    }
   }
 
   public void setNeededColumnIDs(List<Integer> neededColumnIDs) {
@@ -296,7 +289,15 @@ public class TableScanDesc extends AbstractOperatorDesc {
     return serializedFilterExpr;
   }
 
+  public void setSerializedFilterExpr(String serializedFilterExpr) {
+    this.serializedFilterExpr = serializedFilterExpr;
+  }
+
   public String getSerializedFilterObject() {
     return serializedFilterObject;
   }
+
+  public void setSerializedFilterObject(String serializedFilterObject) {
+    this.serializedFilterObject = serializedFilterObject;
+  }
 }