You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2008/12/23 00:24:34 UTC

svn commit: r728824 - in /hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: lib/Node.java optimizer/ColumnPrunerProcCtx.java optimizer/ColumnPrunerProcFactory.java parse/ASTNode.java

Author: zshao
Date: Mon Dec 22 15:24:34 2008
New Revision: 728824

URL: http://svn.apache.org/viewvc?rev=728824&view=rev
Log:
HIVE-186. Refactor code to use a single graph, nodeprocessor, dispatcher and rule abstraction. (Ashish Thusoo via zshao)

Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Node.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Node.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Node.java?rev=728824&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Node.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Node.java Mon Dec 22 15:24:34 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lib;
+
+import java.util.Vector;
+
+/**
+ * This interface defines the functions needed by the walkers and dispatchers.
+ * These are implemented by the node of the graph that needs to be walked.
+ */
+public interface Node {
+  
+  /**
+   * Gets the vector of children nodes. This is used in the graph walker algorithms.
+   * 
+   * @return Vector<Node>
+   */
+  public Vector<Node> getChildren();
+  
+  /**
+   * Gets the name of the node. This is used in the rule dispatchers.
+   * 
+   * @return String
+   */
+  public String getName();
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java?rev=728824&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java Mon Dec 22 15:24:34 2008
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+
+/**
+ * This class implements the processor context for Column Pruner.
+ */
+public class ColumnPrunerProcCtx extends NodeProcessorCtx {
+  
+  private  Map<Operator<? extends Serializable>,List<String>> prunedColLists;
+  
+  private HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+    
+  public ColumnPrunerProcCtx(HashMap<Operator<? extends Serializable>, OpParseContext> opToParseContextMap) {
+    prunedColLists = new HashMap<Operator<? extends Serializable>, List<String>>();
+    this.opToParseCtxMap = opToParseContextMap;
+  }
+
+  /**
+   * @return the prunedColLists
+   */
+  public List<String> getPrunedColList(Operator<? extends Serializable> op) {
+    return prunedColLists.get(op);
+  }
+
+  public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
+    return opToParseCtxMap;
+  }
+  
+  public Map<Operator<? extends Serializable>, List<String>> getPrunedColLists() {
+    return prunedColLists;
+  }
+  
+  /**
+   * Creates the list of internal column names(these names are used in the RowResolver and
+   * are different from the external column names) that are needed in the subtree. These columns 
+   * eventually have to be selected from the table scan.
+   * 
+   * @param curOp The root of the operator subtree.
+   * @return List<String> of the internal column names.
+   * @throws SemanticException
+   */
+  public List<String> genColLists(Operator<? extends Serializable> curOp) throws SemanticException {
+    List<String> colList = new ArrayList<String>();
+    if(curOp.getChildOperators() != null) {
+      for(Operator<? extends Serializable> child: curOp.getChildOperators())
+        colList = Utilities.mergeUniqElems(colList, prunedColLists.get(child));
+    }
+    return colList;
+  }
+  
+  /**
+   * Creates the list of internal column names from select expressions in a select operator.
+   * This function is used for the select operator instead of the genColLists function (which is
+   * used by the rest of the operators).
+   * 
+   * @param op The select operator.
+   * @return List<String> of the internal column names.
+   */
+  public List<String> getColsFromSelectExpr(SelectOperator op) {
+    List<String> cols = new ArrayList<String>();
+    selectDesc conf = op.getConf();
+    ArrayList<exprNodeDesc> exprList = conf.getColList();
+    for (exprNodeDesc expr : exprList)
+      cols = Utilities.mergeUniqElems(cols, expr.getCols());
+    return cols;
+  }
+
+  /**
+   * Creates the list of internal column names for select * expressions.
+   * 
+   * @param op The select operator.
+   * @param colList The list of internal column names returned by the children of the select operator.
+   * @return List<String> of the internal column names.
+   */
+  public List<String> getSelectColsFromChildren(SelectOperator op, List<String> colList) {
+    List<String> cols = new ArrayList<String>();
+    selectDesc conf = op.getConf();
+    ArrayList<exprNodeDesc> selectExprs = conf.getColList();
+
+    for (String col : colList) {
+      // col is the internal name i.e. position within the expression list
+      exprNodeDesc expr = selectExprs.get(Integer.parseInt(col));
+      cols = Utilities.mergeUniqElems(cols, expr.getCols());
+    }
+    return cols;
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=728824&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Mon Dec 22 15:24:34 2008
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.aggregationDesc;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.groupByDesc;
+import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+
+/**
+ * Factory for generating the different node processors used by ColumnPruner.
+ */
+public class ColumnPrunerProcFactory {
+
+  /**
+   * Node Processor for Column Pruning on Filter Operators.
+   */
+  public static class ColumnPrunerFilterProc implements NodeProcessor {  
+    public void process(Node nd, NodeProcessorCtx ctx) throws SemanticException {
+      FilterOperator op = (FilterOperator)nd;
+      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+      exprNodeDesc condn = op.getConf().getPredicate();
+      // get list of columns used in the filter
+      List<String> cl = condn.getCols();
+      // merge it with the downstream col list
+      cppCtx.getPrunedColLists().put(op, Utilities.mergeUniqElems(cppCtx.genColLists(op), cl));
+    }
+  }
+  
+  /**
+   * Factory method to get the ColumnPrunerFilterProc class.
+   * @return ColumnPrunerFilterProc
+   */
+  public static ColumnPrunerFilterProc getFilterProc() {
+    return new ColumnPrunerFilterProc();
+  }
+  
+  /**
+   * Node Processor for Column Pruning on Group By Operators.
+   */
+  public static class ColumnPrunerGroupByProc implements NodeProcessor {
+    public void process(Node nd, NodeProcessorCtx ctx) throws SemanticException {
+      GroupByOperator op = (GroupByOperator)nd;
+      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+      List<String> colLists = new ArrayList<String>();
+      groupByDesc conf = op.getConf();
+      ArrayList<exprNodeDesc> keys = conf.getKeys();
+      for (exprNodeDesc key : keys)
+        colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+
+      ArrayList<aggregationDesc> aggrs = conf.getAggregators();
+      for (aggregationDesc aggr : aggrs) { 
+        ArrayList<exprNodeDesc> params = aggr.getParameters();
+        for (exprNodeDesc param : params) 
+          colLists = Utilities.mergeUniqElems(colLists, param.getCols());
+      }
+
+      cppCtx.getPrunedColLists().put(op, colLists);
+    }
+  }
+
+  /**
+   * Factory method to get the ColumnPrunerGroupByProc class.
+   * @return ColumnPrunerGroupByProc
+   */
+  public static ColumnPrunerGroupByProc getGroupByProc() {
+    return new ColumnPrunerGroupByProc();
+  }
+
+  /**
+   * The Default Node Processor for Column Pruning.
+   */
+  public static class ColumnPrunerDefaultProc implements NodeProcessor {
+    public void process(Node nd, NodeProcessorCtx ctx) throws SemanticException {
+      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+      cppCtx.getPrunedColLists().put((Operator<? extends Serializable>)nd, 
+          cppCtx.genColLists((Operator<? extends Serializable>)nd));
+    }
+  }
+
+  /**
+   * Factory method to get the ColumnPrunerDefaultProc class.
+   * @return ColumnPrunerDefaultProc
+   */
+  public static ColumnPrunerDefaultProc getDefaultProc() {
+    return new ColumnPrunerDefaultProc();
+  }
+  
+  /**
+   * The Node Processor for Column Pruning on Reduce Sink Operators.
+   */
+  public static class ColumnPrunerReduceSinkProc implements NodeProcessor {
+    public void process(Node nd, NodeProcessorCtx ctx) throws SemanticException {
+      ReduceSinkOperator op = (ReduceSinkOperator)nd;
+      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+      HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap = 
+          cppCtx.getOpToParseCtxMap();
+      RowResolver redSinkRR = opToParseCtxMap.get(op).getRR();
+      reduceSinkDesc conf = op.getConf();
+      List<Operator<? extends Serializable>> childOperators = op.getChildOperators();
+      List<Operator<? extends Serializable>> parentOperators = op.getParentOperators();
+      List<String> childColLists = new ArrayList<String>();
+
+      for(Operator<? extends Serializable> child: childOperators)
+        childColLists = Utilities.mergeUniqElems(childColLists, cppCtx.getPrunedColLists().get(child));
+
+      List<String> colLists = new ArrayList<String>();
+      ArrayList<exprNodeDesc> keys = conf.getKeyCols();
+      for (exprNodeDesc key : keys)
+        colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+
+      if ((childOperators.size() == 1) && (childOperators.get(0) instanceof JoinOperator)) {
+        assert parentOperators.size() == 1;
+        Operator<? extends Serializable> par = parentOperators.get(0);
+        RowResolver parRR = opToParseCtxMap.get(par).getRR();
+        RowResolver childRR = opToParseCtxMap.get(childOperators.get(0)).getRR();
+
+        for (String childCol : childColLists) {
+          String [] nm = childRR.reverseLookup(childCol);
+          ColumnInfo cInfo = redSinkRR.get(nm[0],nm[1]);
+          if (cInfo != null) {
+            cInfo = parRR.get(nm[0], nm[1]);
+            if (!colLists.contains(cInfo.getInternalName()))
+              colLists.add(cInfo.getInternalName());
+          }
+        }
+      }
+      else {
+        // Reduce Sink contains the columns needed - no need to aggregate from children
+        ArrayList<exprNodeDesc> vals = conf.getValueCols();
+        for (exprNodeDesc val : vals)
+          colLists = Utilities.mergeUniqElems(colLists, val.getCols());
+      }
+
+      cppCtx.getPrunedColLists().put(op, colLists);
+    }
+  }
+
+  /**
+   * The Factory method to get ColumnPrunerReduceSinkProc class.
+   * @return ColumnPrunerReduceSinkProc
+   */
+  public static ColumnPrunerReduceSinkProc getReduceSinkProc() {
+    return new ColumnPrunerReduceSinkProc();
+  }
+
+  /**
+   * The Node Processor for Column Pruning on Select Operators.
+   */
+  public static class ColumnPrunerSelectProc implements NodeProcessor {
+    public void process(Node nd, NodeProcessorCtx ctx) throws SemanticException {
+      SelectOperator op = (SelectOperator)nd;
+      ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+      List<String> cols = new ArrayList<String>();
+
+      if(op.getChildOperators() != null) {
+        for(Operator<? extends Serializable> child: op.getChildOperators()) {
+          // If one of my children is a FileSink or Script, return all columns.
+          // Without this break, a bug in ReduceSink to Extract edge column pruning will manifest
+          // which should be fixed before remove this
+          if ((child instanceof FileSinkOperator) || (child instanceof ScriptOperator)) {
+            cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op));
+            return;
+          }
+          cols = Utilities.mergeUniqElems(cols, cppCtx.getPrunedColLists().get(child));
+        }
+      }
+
+      selectDesc conf = op.getConf();
+      if (conf.isSelectStar() && !cols.isEmpty()) {
+        // The input to the select does not matter. Go over the expressions 
+        // and return the ones which have a marked column
+        cppCtx.getPrunedColLists().put(op, cppCtx.getSelectColsFromChildren(op, cols));
+        return;
+      }
+      cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op));
+    }
+  }
+
+  /**
+   * The Factory method to get the ColumnPrunerSelectProc class.
+   * @return ColumnPrunerSelectProc
+   */
+  public static ColumnPrunerSelectProc getSelectProc() {
+    return new ColumnPrunerSelectProc();
+  }
+  
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java?rev=728824&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java Mon Dec 22 15:24:34 2008
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Vector;
+import org.antlr.runtime.tree.CommonTree;
+import org.antlr.runtime.Token;
+import org.apache.hadoop.hive.ql.lib.Node;
+
+/**
+ * @author athusoo
+ *
+ */
+public class ASTNode extends CommonTree implements Node {
+
+  /**
+   * Constructor
+   * @param t Token for the CommonTree Node
+   */
+  public ASTNode(Token t) {
+    super(t);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.hive.ql.lib.Node#getChildren()
+   */
+  @Override
+  public Vector<Node> getChildren() {
+    if (super.getChildCount() == 0) {
+      return null;
+    }
+    
+    Vector<Node> ret_vec = new Vector<Node>();
+    for(int i=0; i<super.getChildCount(); ++i) {
+      ret_vec.add((Node)super.getChild(i));
+    }
+    
+    return ret_vec;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.hive.ql.lib.Node#getName()
+   */
+  @Override
+  public String getName() {
+    return (new Integer(super.getToken().getType())).toString();
+  }
+}