You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jp...@apache.org on 2014/12/08 06:08:26 UTC

svn commit: r1643736 - in /hive/trunk: itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/

Author: jpullokk
Date: Mon Dec  8 05:08:25 2014
New Revision: 1643736

URL: http://svn.apache.org/r1643736
Log:
Rolled back to 1643551

Removed:
    hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx_cbo_1.q
    hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx_cbo_2.q
    hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx_cbo_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx_cbo_2.q.out
Modified:
    hive/trunk/itests/src/test/resources/testconfiguration.properties
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
    hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out

Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Mon Dec  8 05:08:25 2014
@@ -31,8 +31,6 @@ minimr.query.files=auto_sortmerge_join_1
   optrstat_groupby.q,\
   parallel_orderby.q,\
   ql_rewrite_gbtoidx.q,\
-  ql_rewrite_gbtoidx_cbo_1.q,\
-  ql_rewrite_gbtoidx_cbo_2.q,\
   quotedid_smb.q,\
   reduce_deduplicate.q,\
   remote_script.q,\

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java Mon Dec  8 05:08:25 2014
@@ -21,16 +21,17 @@ package org.apache.hadoop.hive.ql.optimi
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -62,31 +63,47 @@ public final class RewriteCanApplyCtx im
   }
 
   // Rewrite Variables
-  private boolean selClauseColsFetchException = false;
+  private int aggFuncCnt = 0;
   private boolean queryHasGroupBy = false;
   private boolean aggFuncIsNotCount = false;
-  private boolean aggParameterException = false;
-
-  //The most important, indexKey
-  private String indexKey;
+  private boolean aggFuncColsFetchException = false;
+  private boolean whrClauseColsFetchException = false;
+  private boolean selClauseColsFetchException = false;
+  private boolean gbyKeysFetchException = false;
+  private boolean countOnAllCols = false;
+  private boolean countOfOne = false;
+  private boolean queryHasMultipleTables = false;
+
+  //Data structures that are populated in the RewriteCanApplyProcFactory
+  //methods to check if the index key meets all criteria
+  private Set<String> selectColumnsList = new LinkedHashSet<String>();
+  private Set<String> predicateColumnsList = new LinkedHashSet<String>();
+  private Set<String> gbKeyNameList = new LinkedHashSet<String>();
+  private Set<String> aggFuncColList = new LinkedHashSet<String>();
 
   private final ParseContext parseContext;
   private String alias;
   private String baseTableName;
   private String indexTableName;
   private String aggFunction;
-  
-  private TableScanOperator tableScanOperator;
-  private List<SelectOperator> selectOperators;
-  private List<GroupByOperator> groupByOperators;
 
   void resetCanApplyCtx(){
+    setAggFuncCnt(0);
     setQueryHasGroupBy(false);
     setAggFuncIsNotCount(false);
+    setAggFuncColsFetchException(false);
+    setWhrClauseColsFetchException(false);
     setSelClauseColsFetchException(false);
+    setGbyKeysFetchException(false);
+    setCountOnAllCols(false);
+    setCountOfOne(false);
+    setQueryHasMultipleTables(false);
+    selectColumnsList.clear();
+    predicateColumnsList.clear();
+    gbKeyNameList.clear();
+    aggFuncColList.clear();
     setBaseTableName("");
     setAggFunction("");
-    setIndexKey("");
   }
 
   public boolean isQueryHasGroupBy() {
@@ -117,6 +134,22 @@ public final class RewriteCanApplyCtx im
     return aggFunction;
   }
 
+  public void setAggFuncColsFetchException(boolean aggFuncColsFetchException) {
+    this.aggFuncColsFetchException = aggFuncColsFetchException;
+  }
+
+  public boolean isAggFuncColsFetchException() {
+    return aggFuncColsFetchException;
+  }
+
+  public void setWhrClauseColsFetchException(boolean whrClauseColsFetchException) {
+    this.whrClauseColsFetchException = whrClauseColsFetchException;
+  }
+
+  public boolean isWhrClauseColsFetchException() {
+    return whrClauseColsFetchException;
+  }
+
   public void setSelClauseColsFetchException(boolean selClauseColsFetchException) {
     this.selClauseColsFetchException = selClauseColsFetchException;
   }
@@ -125,6 +158,78 @@ public final class RewriteCanApplyCtx im
     return selClauseColsFetchException;
   }
 
+  public void setGbyKeysFetchException(boolean gbyKeysFetchException) {
+    this.gbyKeysFetchException = gbyKeysFetchException;
+  }
+
+  public boolean isGbyKeysFetchException() {
+    return gbyKeysFetchException;
+  }
+
+  public void setCountOnAllCols(boolean countOnAllCols) {
+    this.countOnAllCols = countOnAllCols;
+  }
+
+  public boolean isCountOnAllCols() {
+    return countOnAllCols;
+  }
+
+  public void setCountOfOne(boolean countOfOne) {
+    this.countOfOne = countOfOne;
+  }
+
+  public boolean isCountOfOne() {
+    return countOfOne;
+  }
+
+  public void setQueryHasMultipleTables(boolean queryHasMultipleTables) {
+    this.queryHasMultipleTables = queryHasMultipleTables;
+  }
+
+  public boolean isQueryHasMultipleTables() {
+    return queryHasMultipleTables;
+  }
+
+  public Set<String> getSelectColumnsList() {
+    return selectColumnsList;
+  }
+
+  public void setSelectColumnsList(Set<String> selectColumnsList) {
+    this.selectColumnsList = selectColumnsList;
+  }
+
+  public Set<String> getPredicateColumnsList() {
+    return predicateColumnsList;
+  }
+
+  public void setPredicateColumnsList(Set<String> predicateColumnsList) {
+    this.predicateColumnsList = predicateColumnsList;
+  }
+
+  public Set<String> getGbKeyNameList() {
+    return gbKeyNameList;
+  }
+
+  public void setGbKeyNameList(Set<String> gbKeyNameList) {
+    this.gbKeyNameList = gbKeyNameList;
+  }
+
+  public Set<String> getAggFuncColList() {
+    return aggFuncColList;
+  }
+
+  public void setAggFuncColList(Set<String> aggFuncColList) {
+    this.aggFuncColList = aggFuncColList;
+  }
+
+   public int getAggFuncCnt() {
+    return aggFuncCnt;
+  }
+
+  public void setAggFuncCnt(int aggFuncCnt) {
+    this.aggFuncCnt = aggFuncCnt;
+  }
+
   public String getAlias() {
     return alias;
   }
@@ -153,6 +258,15 @@ public final class RewriteCanApplyCtx im
     return parseContext;
   }
 
+  public Set<String> getAllColumns() {
+    Set<String> allColumns = new LinkedHashSet<String>(selectColumnsList);
+    allColumns.addAll(predicateColumnsList);
+    allColumns.addAll(gbKeyNameList);
+    allColumns.addAll(aggFuncColList);
+    return allColumns;
+  }
+
+
   /**
    * This method walks all the nodes starting from topOp TableScanOperator node
    * and invokes methods from {@link RewriteCanApplyProcFactory} for each of the rules
@@ -168,14 +282,10 @@ public final class RewriteCanApplyCtx im
   void populateRewriteVars(TableScanOperator topOp)
     throws SemanticException{
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    //^TS%[(SEL%)|(FIL%)]*GRY%[(FIL%)]*RS%[(FIL%)]*GRY%
-    opRules.put(
-        new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%[("
-            + SelectOperator.getOperatorName() + "%)|(" + FilterOperator.getOperatorName() + "%)]*"
-            + GroupByOperator.getOperatorName() + "%[" + FilterOperator.getOperatorName() + "%]*"
-            + ReduceSinkOperator.getOperatorName() + "%[" + FilterOperator.getOperatorName()
-            + "%]*" + GroupByOperator.getOperatorName() + "%"),
-        RewriteCanApplyProcFactory.canApplyOnTableScanOperator(topOp));
+    opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"),
+        RewriteCanApplyProcFactory.canApplyOnFilterOperator(topOp));
+    opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + "%"),
+        RewriteCanApplyProcFactory.canApplyOnGroupByOperator(topOp));
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -213,53 +323,67 @@ public final class RewriteCanApplyCtx im
 
   //Map for base table to index table mapping
   //TableScan operator for base table will be modified to read from index table
-  private final Map<String, String> baseToIdxTableMap = new HashMap<String, String>();;
-
-  public void addTable(String baseTableName, String indexTableName) {
-    baseToIdxTableMap.put(baseTableName, indexTableName);
-  }
-
-  public String findBaseTable(String baseTableName) {
-    return baseToIdxTableMap.get(baseTableName);
-  }
+  private final Map<String, String> baseToIdxTableMap =
+    new HashMap<String, String>();;
 
-  public String getIndexKey() {
-    return indexKey;
-  }
-
-  public void setIndexKey(String indexKey) {
-    this.indexKey = indexKey;
-  }
-
-  public TableScanOperator getTableScanOperator() {
-    return tableScanOperator;
-  }
 
-  public void setTableScanOperator(TableScanOperator tableScanOperator) {
-    this.tableScanOperator = tableScanOperator;
-  }
-
-  public List<SelectOperator> getSelectOperators() {
-    return selectOperators;
-  }
+  public void addTable(String baseTableName, String indexTableName) {
+     baseToIdxTableMap.put(baseTableName, indexTableName);
+   }
 
-  public void setSelectOperators(List<SelectOperator> selectOperators) {
-    this.selectOperators = selectOperators;
-  }
+   public String findBaseTable(String baseTableName)  {
+     return baseToIdxTableMap.get(baseTableName);
+   }
+
+
+  boolean isIndexUsableForQueryBranchRewrite(Index index, Set<String> indexKeyNames){
+
+    //--------------------------------------------
+    //Check if all columns in select list are part of index key columns
+    if (!indexKeyNames.containsAll(selectColumnsList)) {
+      LOG.info("Select list has non index key column : " +
+          " Cannot use index " + index.getIndexName());
+      return false;
+    }
 
-  public List<GroupByOperator> getGroupByOperators() {
-    return groupByOperators;
-  }
+    //--------------------------------------------
+    // Check if all columns in where predicate are part of index key columns
+    if (!indexKeyNames.containsAll(predicateColumnsList)) {
+      LOG.info("Predicate column ref list has non index key column : " +
+          " Cannot use index  " + index.getIndexName());
+      return false;
+    }
 
-  public void setGroupByOperators(List<GroupByOperator> groupByOperators) {
-    this.groupByOperators = groupByOperators;
-  }
+      //--------------------------------------------
+      // For group by, we need to check if all keys are from index columns
+      // itself. Here GB key order can be different than index columns but that does
+      // not really matter for final result.
+      if (!indexKeyNames.containsAll(gbKeyNameList)) {
+        LOG.info("Group by key has some non-indexed columns, " +
+            " Cannot use index  " + index.getIndexName());
+        return false;
+      }
 
-  public void setAggParameterException(boolean aggParameterException) {
-    this.aggParameterException = aggParameterException;
-  }
+      // If we have agg function (currently only COUNT is supported), check if its inputs are
+      // from index. we currently support only that.
+      if (aggFuncColList.size() > 0)  {
+        if (!indexKeyNames.containsAll(aggFuncColList)){
+          LOG.info("Agg Func input is not present in index key columns. Currently " +
+              "only agg func on index columns are supported by rewrite optimization");
+          return false;
+        }
+      }
 
-  public boolean isAggParameterException() {
-    return aggParameterException;
+    //Now that we are good to do this optimization, set parameters in context
+    //which would be used by transformation procedure as inputs.
+    if(queryHasGroupBy
+        && aggFuncCnt == 1
+        && !aggFuncIsNotCount){
+      addTable(baseTableName, index.getIndexTableName());
+    }else{
+      LOG.info("No valid criteria met to apply rewrite.");
+      return false;
+    }
+    return true;
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java Mon Dec  8 05:08:25 2014
@@ -18,9 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer.index;
 
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -28,12 +27,13 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 
@@ -43,74 +43,154 @@ import java.util.Stack;
  *
  */
 public final class RewriteCanApplyProcFactory {
-  public static CheckTableScanProc canApplyOnTableScanOperator(TableScanOperator topOp) {
-    return new CheckTableScanProc();
-  }
 
-  private static class CheckTableScanProc implements NodeProcessor {
-    public CheckTableScanProc() {
+  /**
+   * Check for conditions in FilterOperator that do not meet rewrite criteria.
+   */
+  private static class CheckFilterProc implements NodeProcessor {
+
+    private TableScanOperator topOp;
+
+    public CheckFilterProc(TableScanOperator topOp) {
+      this.topOp = topOp;
     }
 
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs)
-        throws SemanticException {
-      RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx) ctx;
-      for (Node node : stack) {
-        // For table scan operator,
-        // check ReferencedColumns to make sure that only the index column is
-        // selected for the following operators.
-        if (node instanceof TableScanOperator) {
-          TableScanOperator ts = (TableScanOperator) node;
-          canApplyCtx.setTableScanOperator(ts);
-          List<String> selectColumns = ts.getConf().getReferencedColumns();
-          if (selectColumns == null || selectColumns.size() != 1) {
-            canApplyCtx.setSelClauseColsFetchException(true);
-            return null;
-          } else {
-            canApplyCtx.setIndexKey(selectColumns.get(0));
-          }
-        } else if (node instanceof SelectOperator) {
-          // For select operators in the stack, we just add them
-          if (canApplyCtx.getSelectOperators() == null) {
-            canApplyCtx.setSelectOperators(new ArrayList<SelectOperator>());
-          }
-          canApplyCtx.getSelectOperators().add((SelectOperator) node);
-        } else if (node instanceof GroupByOperator) {
-          if (canApplyCtx.getGroupByOperators() == null) {
-            canApplyCtx.setGroupByOperators(new ArrayList<GroupByOperator>());
-          }
-          // According to the pre-order,
-          // the first GroupbyOperator is the one before RS
-          // and the second one is the one after RS
-          GroupByOperator operator = (GroupByOperator) node;
-          canApplyCtx.getGroupByOperators().add(operator);
-          if (!canApplyCtx.isQueryHasGroupBy()) {
-            canApplyCtx.setQueryHasGroupBy(true);
-            GroupByDesc conf = operator.getConf();
-            List<AggregationDesc> aggrList = conf.getAggregators();
-            if (aggrList == null || aggrList.size() != 1
-                || !("count".equals(aggrList.get(0).getGenericUDAFName()))) {
-              // In the current implementation, we make sure that only count is
-              // in the function
-              canApplyCtx.setAggFuncIsNotCount(true);
-              return null;
-            } else {
-              List<ExprNodeDesc> para = aggrList.get(0).getParameters();
-              if (para == null || para.size() == 0 || para.size() > 1) {
-                canApplyCtx.setAggParameterException(true);
-                return null;
-              } else {
-                ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator,
-                    (Operator<OperatorDesc>) stack.get(0));
-                if (!(expr instanceof ExprNodeColumnDesc)) {
-                  canApplyCtx.setAggParameterException(true);
-                  return null;
-                }
-              }
-            }
-          }
-        }
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      FilterOperator operator = (FilterOperator)nd;
+      RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx;
+      FilterDesc conf = operator.getConf();
+      //The filter operator should have a predicate of ExprNodeGenericFuncDesc type.
+      //This represents the comparison operator
+      ExprNodeDesc oldengfd = conf.getPredicate();
+      if(oldengfd == null){
+        canApplyCtx.setWhrClauseColsFetchException(true);
+        return null;
+      }
+      ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(oldengfd, operator, topOp);
+      if (backtrack == null) {
+        canApplyCtx.setWhrClauseColsFetchException(true);
+        return null;
+      }
+      //Add the predicate columns to RewriteCanApplyCtx's predColRefs list to check later
+      //if index keys contain all filter predicate columns and vice-a-versa
+      for (String col : backtrack.getCols()) {
+        canApplyCtx.getPredicateColumnsList().add(col);
       }
       return null;
     }
   }
+
+ public static CheckFilterProc canApplyOnFilterOperator(TableScanOperator topOp) {
+    return new CheckFilterProc(topOp);
+  }
+
+   /**
+   * Check for conditions in GroupByOperator that do not meet rewrite criteria.
+   *
+   */
+  private static class CheckGroupByProc implements NodeProcessor {
+
+     private TableScanOperator topOp;
+
+     public CheckGroupByProc(TableScanOperator topOp) {
+       this.topOp = topOp;
+     }
+
+     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+         Object... nodeOutputs) throws SemanticException {
+       GroupByOperator operator = (GroupByOperator)nd;
+       RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx;
+       //for each group-by clause in query, only one GroupByOperator of the
+       //GBY-RS-GBY sequence is stored in  getGroupOpToInputTables
+       //we need to process only this operator
+       //Also, we do not rewrite for cases when same query branch has multiple group-by constructs
+       if(canApplyCtx.getParseContext().getGroupOpToInputTables().containsKey(operator) &&
+           !canApplyCtx.isQueryHasGroupBy()){
+
+         canApplyCtx.setQueryHasGroupBy(true);
+         GroupByDesc conf = operator.getConf();
+         List<AggregationDesc> aggrList = conf.getAggregators();
+         if(aggrList != null && aggrList.size() > 0){
+             for (AggregationDesc aggregationDesc : aggrList) {
+               canApplyCtx.setAggFuncCnt(canApplyCtx.getAggFuncCnt() + 1);
+               //In the current implementation, we do not support more than 1 agg funcs in group-by
+               if(canApplyCtx.getAggFuncCnt() > 1) {
+                 return false;
+               }
+               String aggFunc = aggregationDesc.getGenericUDAFName();
+               if(!("count".equals(aggFunc))){
+                 canApplyCtx.setAggFuncIsNotCount(true);
+                 return false;
+               }
+               List<ExprNodeDesc> para = aggregationDesc.getParameters();
+               //for a valid aggregation, it needs to have non-null parameter list
+               if (para == null) {
+                 canApplyCtx.setAggFuncColsFetchException(true);
+               } else if (para.size() == 0) {
+                 //count(*) case
+                 canApplyCtx.setCountOnAllCols(true);
+                 canApplyCtx.setAggFunction("_count_of_all");
+               } else if (para.size() == 1) {
+                 ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator, topOp);
+                 if (expr instanceof ExprNodeColumnDesc){
+                   //Add the columns to RewriteCanApplyCtx's selectColumnsList list
+                   //to check later if index keys contain all select clause columns
+                   //and vice-a-versa. We get the select column 'actual' names only here
+                   //if we have a agg func along with group-by
+                   //SelectOperator has internal names in its colList data structure
+                   canApplyCtx.getSelectColumnsList().add(
+                       ((ExprNodeColumnDesc) expr).getColumn());
+                   //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later
+                   //if columns contained in agg func are index key columns
+                   canApplyCtx.getAggFuncColList().add(
+                       ((ExprNodeColumnDesc) expr).getColumn());
+                   canApplyCtx.setAggFunction("_count_of_" +
+                       ((ExprNodeColumnDesc) expr).getColumn() + "");
+                 } else if(expr instanceof ExprNodeConstantDesc) {
+                   //count(1) case
+                   canApplyCtx.setCountOfOne(true);
+                   canApplyCtx.setAggFunction("_count_of_1");
+                 }
+               } else {
+                 throw new SemanticException("Invalid number of arguments for count");
+               }
+             }
+         }
+
+         //we need to have non-null group-by keys for a valid group-by operator
+         List<ExprNodeDesc> keyList = conf.getKeys();
+         if(keyList == null || keyList.size() == 0){
+           canApplyCtx.setGbyKeysFetchException(true);
+         }
+         for (ExprNodeDesc expr : keyList) {
+           checkExpression(canApplyCtx, expr);
+         }
+       }
+       return null;
+     }
+
+     private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr){
+       if(expr instanceof ExprNodeColumnDesc){
+         //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later
+         //if all keys are from index columns
+         canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
+       }else if(expr instanceof ExprNodeGenericFuncDesc){
+         ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc)expr;
+         List<ExprNodeDesc> childExprs = funcExpr.getChildren();
+         for (ExprNodeDesc childExpr : childExprs) {
+           if(childExpr instanceof ExprNodeColumnDesc){
+             canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
+             canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn());
+           }else if(childExpr instanceof ExprNodeGenericFuncDesc){
+             checkExpression(canApplyCtx, childExpr);
+           }
+         }
+       }
+     }
+   }
+
+   public static CheckGroupByProc canApplyOnGroupByOperator(TableScanOperator topOp) {
+     return new CheckGroupByProc(topOp);
+   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Mon Dec  8 05:08:25 2014
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.ql.optimi
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.util.StringUtils;
 
 
 /**
@@ -149,6 +153,10 @@ public class RewriteGBUsingIndex impleme
    * @throws SemanticException
    */
   boolean shouldApplyOptimization() throws SemanticException {
+    if (ifQueryHasMultipleTables()) {
+      //We do not apply this optimization for this case as of now.
+      return false;
+    }
     Map<Table, List<Index>> tableToIndex = getIndexesForRewrite();
     if (tableToIndex.isEmpty()) {
       LOG.debug("No Valid Index Found to apply Rewrite, " +
@@ -162,14 +170,19 @@ public class RewriteGBUsingIndex impleme
      * the tsOpToProcess to apply rewrite later on.
      * */
     Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+    Map<String, Operator<?>> topOps = parseContext.getTopOps();
+
     for (Map.Entry<String, Operator<?>> entry : parseContext.getTopOps().entrySet()) {
+
       String alias = entry.getKey();
       TableScanOperator topOp = (TableScanOperator) entry.getValue();
+
       Table table = topToTable.get(topOp);
       List<Index> indexes = tableToIndex.get(table);
       if (indexes.isEmpty()) {
         continue;
       }
+
       if (table.isPartitioned()) {
         //if base table has partitions, we need to check if index is built for
         //all partitions. If not, then we do not apply the optimization
@@ -183,6 +196,7 @@ public class RewriteGBUsingIndex impleme
       //if there are no partitions on base table
       checkIfRewriteCanBeApplied(alias, topOp, table, indexes);
     }
+
     return !tsOpToProcess.isEmpty();
   }
 
@@ -199,21 +213,26 @@ public class RewriteGBUsingIndex impleme
       Table baseTable, List<Index> indexes) throws SemanticException{
     //Context for checking if this optimization can be applied to the input query
     RewriteCanApplyCtx canApplyCtx = RewriteCanApplyCtx.getInstance(parseContext);
+
     canApplyCtx.setAlias(alias);
     canApplyCtx.setBaseTableName(baseTable.getTableName());
     canApplyCtx.populateRewriteVars(topOp);
-    Map<Index, String> indexTableMap = getIndexToKeysMap(indexes);
-    for (Map.Entry<Index, String> entry : indexTableMap.entrySet()) {
+
+    Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes);
+    for (Map.Entry<Index, Set<String>> entry : indexTableMap.entrySet()) {
       //we rewrite the original query using the first valid index encountered
       //this can be changed if we have a better mechanism to
       //decide which index will produce a better rewrite
       Index index = entry.getKey();
-      String indexKeyName = entry.getValue();
+      Set<String> indexKeyNames = entry.getValue();
       //break here if any valid index is found to apply rewrite
-      if (canApplyCtx.getIndexKey() != null && canApplyCtx.getIndexKey().equals(indexKeyName)
-          && checkIfAllRewriteCriteriaIsMet(canApplyCtx)) {
-        canApplyCtx.setAggFunction("_count_of_" + indexKeyName + "");
-        canApplyCtx.addTable(canApplyCtx.getBaseTableName(), index.getIndexTableName());
+      if (canApplyCtx.isIndexUsableForQueryBranchRewrite(index, indexKeyNames) &&
+          checkIfAllRewriteCriteriaIsMet(canApplyCtx)) {
+        //check if aggregation function is set.
+        //If not, set it using the only indexed column
+        if (canApplyCtx.getAggFunction() == null) {
+          canApplyCtx.setAggFunction("_count_of_" + StringUtils.join(",", indexKeyNames) + "");
+        }
         canApplyCtx.setIndexTableName(index.getIndexTableName());
         tsOpToProcess.put(alias, canApplyCtx);
         return true;
@@ -223,6 +242,27 @@ public class RewriteGBUsingIndex impleme
   }
 
   /**
+   * This block of code iterates over the topToTable map from ParseContext
+   * to determine if the query has a scan over multiple tables.
+   * @return
+   */
+  boolean ifQueryHasMultipleTables(){
+    Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+    Iterator<Table> valuesItr = topToTable.values().iterator();
+    Set<String> tableNameSet = new HashSet<String>();
+    while(valuesItr.hasNext()){
+      Table table = valuesItr.next();
+      tableNameSet.add(table.getTableName());
+    }
+    if(tableNameSet.size() > 1){
+      LOG.debug("Query has more than one table " +
+          "that is not supported with " + getName() + " optimization.");
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Get a list of indexes which can be used for rewrite.
    * @return
    * @throws SemanticException
@@ -279,16 +319,19 @@ public class RewriteGBUsingIndex impleme
    * @return
    * @throws SemanticException
    */
-  Map<Index, String> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{
+  Map<Index, Set<String>> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{
     Hive hiveInstance = hiveDb;
-    Map<Index, String> indexToKeysMap = new LinkedHashMap<Index, String>();
+    Map<Index, Set<String>> indexToKeysMap = new LinkedHashMap<Index, Set<String>>();
      for (int idxCtr = 0; idxCtr < indexTables.size(); idxCtr++)  {
+      final Set<String> indexKeyNames = new LinkedHashSet<String>();
       Index index = indexTables.get(idxCtr);
        //Getting index key columns
       StorageDescriptor sd = index.getSd();
       List<FieldSchema> idxColList = sd.getCols();
-      assert idxColList.size()==1;
-      String indexKeyName = idxColList.get(0).getName();
+      for (FieldSchema fieldSchema : idxColList) {
+        indexKeyNames.add(fieldSchema.getName());
+      }
+      assert indexKeyNames.size()==1;
       // Check that the index schema is as expected. This code block should
       // catch problems of this rewrite breaking when the AggregateIndexHandler
       // index is changed.
@@ -312,7 +355,7 @@ public class RewriteGBUsingIndex impleme
       // and defer the decision of using a particular index for later
       // this is to allow choosing a index if a better mechanism is
       // designed later to chose a better rewrite
-      indexToKeysMap.put(index, indexKeyName);
+      indexToKeysMap.put(index, indexKeyNames);
     }
     return indexToKeysMap;
   }
@@ -323,11 +366,20 @@ public class RewriteGBUsingIndex impleme
    * @throws SemanticException
    *
    */
+  @SuppressWarnings("unchecked")
   private void rewriteOriginalQuery() throws SemanticException {
-    for (RewriteCanApplyCtx canApplyCtx : tsOpToProcess.values()) {
+    Map<String, Operator<?>> topOpMap = parseContext.getTopOps();
+    Iterator<String> tsOpItr = tsOpToProcess.keySet().iterator();
+
+    for (Map.Entry<String, RewriteCanApplyCtx> entry : tsOpToProcess.entrySet()) {
+      String alias = entry.getKey();
+      RewriteCanApplyCtx canApplyCtx = entry.getValue();
+      TableScanOperator topOp = (TableScanOperator) topOpMap.get(alias);
       RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx =
-          RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, canApplyCtx);
-      rewriteQueryCtx.invokeRewriteQueryProc();
+        RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb,
+            canApplyCtx.getIndexTableName(), canApplyCtx.getAlias(),
+            canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction());
+      rewriteQueryCtx.invokeRewriteQueryProc(topOp);
       parseContext = rewriteQueryCtx.getParseContext();
       parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends OperatorDesc>,
           OpParseContext>) rewriteQueryCtx.getOpc());
@@ -340,20 +392,45 @@ public class RewriteGBUsingIndex impleme
    * This method logs the reason for which we cannot apply the rewrite optimization.
    * @return
    */
-  boolean checkIfAllRewriteCriteriaIsMet(RewriteCanApplyCtx canApplyCtx) {
-    if (canApplyCtx.isSelClauseColsFetchException()) {
-      LOG.debug("Got exception while locating child col refs for select list, " + "skipping "
-          + getName() + " optimization.");
+  boolean checkIfAllRewriteCriteriaIsMet(RewriteCanApplyCtx canApplyCtx){
+    if (canApplyCtx.getAggFuncCnt() > 1){
+      LOG.debug("More than 1 agg funcs: " +
+          "Not supported by " + getName() + " optimization.");
+      return false;
+    }
+    if (canApplyCtx.isAggFuncIsNotCount()){
+      LOG.debug("Agg func other than count is " +
+          "not supported by " + getName() + " optimization.");
+      return false;
+    }
+    if (canApplyCtx.isCountOnAllCols()){
+      LOG.debug("Currently count function needs group by on key columns. This is a count(*) case.,"
+          + "Cannot apply this " + getName() + " optimization.");
+      return false;
+    }
+    if (canApplyCtx.isCountOfOne()){
+      LOG.debug("Currently count function needs group by on key columns. This is a count(1) case.,"
+          + "Cannot apply this " + getName() + " optimization.");
+      return false;
+    }
+    if (canApplyCtx.isAggFuncColsFetchException()){
+      LOG.debug("Got exception while locating child col refs " +
+          "of agg func, skipping " + getName() + " optimization.");
+      return false;
+    }
+    if (canApplyCtx.isWhrClauseColsFetchException()){
+      LOG.debug("Got exception while locating child col refs for where clause, "
+          + "skipping " + getName() + " optimization.");
       return false;
     }
-    if (canApplyCtx.isAggFuncIsNotCount()) {
-      LOG.debug("Agg func other than count is " + "not supported by " + getName()
-          + " optimization.");
+    if (canApplyCtx.isSelClauseColsFetchException()){
+      LOG.debug("Got exception while locating child col refs for select list, "
+          + "skipping " + getName() + " optimization.");
       return false;
     }
-    if (canApplyCtx.isAggParameterException()) {
-      LOG.debug("Got exception while locating parameter refs for aggregation, " + "skipping "
-          + getName() + " optimization.");
+    if (canApplyCtx.isGbyKeysFetchException()){
+      LOG.debug("Got exception while locating child col refs for GroupBy key, "
+          + "skipping " + getName() + " optimization.");
       return false;
     }
     return true;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Mon Dec  8 05:08:25 2014
@@ -19,46 +19,32 @@
 package org.apache.hadoop.hive.ql.optimizer.index;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 /**
  * RewriteQueryUsingAggregateIndexCtx class stores the
@@ -67,37 +53,37 @@ import org.apache.hadoop.hive.serde2.typ
  */
 
 public final class RewriteQueryUsingAggregateIndexCtx  implements NodeProcessorCtx {
-  private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndexCtx.class.getName());
+
   private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb,
-      RewriteCanApplyCtx canApplyCtx) {
+      String indexTableName, String alias, Set<String> columns, String aggregateFunction) {
     this.parseContext = parseContext;
     this.hiveDb = hiveDb;
-    this.canApplyCtx = canApplyCtx;
-    this.indexTableName = canApplyCtx.getIndexTableName();
-    this.alias = canApplyCtx.getAlias();
-    this.aggregateFunction = canApplyCtx.getAggFunction();
+    this.indexTableName = indexTableName;
+    this.alias = alias;
+    this.aggregateFunction = aggregateFunction;
+    this.columns = columns;
     this.opc = parseContext.getOpParseCtx();
-    this.indexKey = canApplyCtx.getIndexKey();
   }
 
   public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext,
-      Hive hiveDb, RewriteCanApplyCtx canApplyCtx) {
+      Hive hiveDb, String indexTableName, String alias,
+      Set<String> columns, String aggregateFunction) {
     return new RewriteQueryUsingAggregateIndexCtx(
-        parseContext, hiveDb, canApplyCtx);
+        parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction);
   }
 
+
   private Map<Operator<? extends OperatorDesc>, OpParseContext> opc =
     new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
   private final Hive hiveDb;
   private final ParseContext parseContext;
-  private RewriteCanApplyCtx canApplyCtx;
   //We need the GenericUDAFEvaluator for GenericUDAF function "sum"
   private GenericUDAFEvaluator eval = null;
   private final String indexTableName;
   private final String alias;
   private final String aggregateFunction;
+  private final Set<String> columns;
   private ExprNodeColumnDesc aggrExprNode = null;
-  private String indexKey;
 
   public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpc() {
     return opc;
@@ -131,6 +117,54 @@ public final class RewriteQueryUsingAggr
     return aggrExprNode;
   }
 
+ /**
+  * Walk the original operator tree using the {@link DefaultGraphWalker} using the rules.
+  * Each of the rules invoke respective methods from the {@link RewriteQueryUsingAggregateIndex}
+  * to rewrite the original query using aggregate index.
+  *
+  * @param topOp
+  * @throws SemanticException
+  */
+  public void invokeRewriteQueryProc(
+      Operator<? extends OperatorDesc> topOp) throws SemanticException{
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+    // replace scan operator containing original table with index table
+    opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+        RewriteQueryUsingAggregateIndex.getReplaceTableScanProc());
+    //rule that replaces index key selection with
+    //sum(`_count_of_indexed_column`) function in original query
+    opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + "%"),
+        RewriteQueryUsingAggregateIndex.getNewQuerySelectSchemaProc());
+    //Manipulates the ExprNodeDesc from GroupByOperator aggregation list
+    opRules.put(new RuleRegExp("R3", GroupByOperator.getOperatorName() + "%"),
+        RewriteQueryUsingAggregateIndex.getNewQueryGroupbySchemaProc());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, this);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.add(topOp);
+    ogw.startWalking(topNodes, null);
+  }
+
+ /**
+  * Default procedure for {@link DefaultRuleDispatcher}.
+  * @return
+  */
+  private NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+        return null;
+      }
+    };
+  }
+
   public String getAlias() {
     return alias;
   }
@@ -139,215 +173,7 @@ public final class RewriteQueryUsingAggr
     return aggregateFunction;
   }
 
-  public String getIndexKey() {
-    return indexKey;
-  }
-
-  public void setIndexKey(String indexKey) {
-    this.indexKey = indexKey;
-  }
-
-  public void invokeRewriteQueryProc() throws SemanticException {
-    this.replaceTableScanProcess(canApplyCtx.getTableScanOperator());
-    //We need aggrExprNode. Thus, replaceGroupByOperatorProcess should come before replaceSelectOperatorProcess
-    for (int index = 0; index < canApplyCtx.getGroupByOperators().size(); index++) {
-      this.replaceGroupByOperatorProcess(canApplyCtx.getGroupByOperators().get(index), index);
-    }
-    for (SelectOperator selectperator : canApplyCtx.getSelectOperators()) {
-      this.replaceSelectOperatorProcess(selectperator);
-    }
-  }
-  
-  /**
-   * This method replaces the original TableScanOperator with the new
-   * TableScanOperator and metadata that scans over the index table rather than
-   * scanning over the original table.
-   *
-   */
-  private void replaceTableScanProcess(TableScanOperator scanOperator) throws SemanticException {
-    RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = this;
-    String alias = rewriteQueryCtx.getAlias();
-
-    // Need to remove the original TableScanOperators from these data structures
-    // and add new ones
-    Map<TableScanOperator, Table> topToTable = rewriteQueryCtx.getParseContext().getTopToTable();
-    Map<String, Operator<? extends OperatorDesc>> topOps = rewriteQueryCtx.getParseContext()
-        .getTopOps();
-    Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContext = rewriteQueryCtx
-        .getParseContext().getOpParseCtx();
-
-    // need this to set rowResolver for new scanOperator
-    OpParseContext operatorContext = opParseContext.get(scanOperator);
-
-    // remove original TableScanOperator
-    topOps.remove(alias);
-    topToTable.remove(scanOperator);
-    opParseContext.remove(scanOperator);
-
-    // construct a new descriptor for the index table scan
-    TableScanDesc indexTableScanDesc = new TableScanDesc();
-    indexTableScanDesc.setGatherStats(false);
-
-    String indexTableName = rewriteQueryCtx.getIndexName();
-    Table indexTableHandle = null;
-    try {
-      indexTableHandle = rewriteQueryCtx.getHiveDb().getTable(indexTableName);
-    } catch (HiveException e) {
-      LOG.error("Error while getting the table handle for index table.");
-      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      throw new SemanticException(e.getMessage(), e);
-    }
-
-    String k = indexTableName + Path.SEPARATOR;
-    indexTableScanDesc.setStatsAggPrefix(k);
-    scanOperator.setConf(indexTableScanDesc);
-
-    // Construct the new RowResolver for the new TableScanOperator
-    RowResolver rr = new RowResolver();
-    try {
-      StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle
-          .getDeserializer().getObjectInspector();
-      StructField field = rowObjectInspector.getStructFieldRef(rewriteQueryCtx.getIndexKey());
-      rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(),
-          TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()),
-          indexTableName, false));
-    } catch (SerDeException e) {
-      LOG.error("Error while creating the RowResolver for new TableScanOperator.");
-      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-      throw new SemanticException(e.getMessage(), e);
-    }
-
-    // Set row resolver for new table
-    operatorContext.setRowResolver(rr);
-
-    String newAlias = indexTableName;
-    int index = alias.lastIndexOf(":");
-    if (index >= 0) {
-      newAlias = alias.substring(0, index) + ":" + indexTableName;
-    }
-
-    // Scan operator now points to other table
-    topToTable.put(scanOperator, indexTableHandle);
-    scanOperator.getConf().setAlias(newAlias);
-    scanOperator.setAlias(indexTableName);
-    topOps.put(newAlias, scanOperator);
-    opParseContext.put(scanOperator, operatorContext);
-    rewriteQueryCtx.getParseContext().setTopToTable((HashMap<TableScanOperator, Table>) topToTable);
-    rewriteQueryCtx.getParseContext().setTopOps(
-        (HashMap<String, Operator<? extends OperatorDesc>>) topOps);
-    rewriteQueryCtx.getParseContext().setOpParseCtx(
-        (LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>) opParseContext);
-
-    ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr,
-        Arrays.asList(rewriteQueryCtx.getIndexKey()));
-  }
-
-  /**
-   * This method replaces the original SelectOperator with the new
-   * SelectOperator with a new column indexed_key_column.
-   */
-  private void replaceSelectOperatorProcess(SelectOperator operator) throws SemanticException {
-    RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = this;
-    // we need to set the colList, outputColumnNames, colExprMap,
-    // rowSchema for only that SelectOperator which precedes the GroupByOperator
-    // count(indexed_key_column) needs to be replaced by
-    // sum(`_count_of_indexed_key_column`)
-    List<ExprNodeDesc> selColList = operator.getConf().getColList();
-    selColList.add(rewriteQueryCtx.getAggrExprNode());
-
-    List<String> selOutputColNames = operator.getConf().getOutputColumnNames();
-    selOutputColNames.add(rewriteQueryCtx.getAggrExprNode().getColumn());
-
-    operator.getColumnExprMap().put(rewriteQueryCtx.getAggrExprNode().getColumn(),
-        rewriteQueryCtx.getAggrExprNode());
-
-    RowSchema selRS = operator.getSchema();
-    List<ColumnInfo> selRSSignature = selRS.getSignature();
-    // Need to create a new type for Column[_count_of_indexed_key_column] node
-    PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo("bigint");
-    pti.setTypeName("bigint");
-    ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false);
-    selRSSignature.add(newCI);
-    selRS.setSignature((ArrayList<ColumnInfo>) selRSSignature);
-    operator.setSchema(selRS);
-  }
-
-  /**
-   * We need to replace the count(indexed_column_key) GenericUDAF aggregation
-   * function for group-by construct to "sum" GenericUDAF. This method creates a
-   * new operator tree for a sample query that creates a GroupByOperator with
-   * sum aggregation function and uses that GroupByOperator information to
-   * replace the original GroupByOperator aggregation information. It replaces
-   * the AggregationDesc (aggregation descriptor) of the old GroupByOperator
-   * with the new Aggregation Desc of the new GroupByOperator.
-   * @return
-   */
-  private void replaceGroupByOperatorProcess(GroupByOperator operator, int index)
-      throws SemanticException {
-    RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = this;
-
-    // We need to replace the GroupByOperator which is before RS
-    if (index == 0) {
-      // the query contains the sum aggregation GenericUDAF
-      String selReplacementCommand = "select sum(`" + rewriteQueryCtx.getAggregateFunction() + "`)"
-          + " from " + rewriteQueryCtx.getIndexName() + " group by "
-          + rewriteQueryCtx.getIndexKey() + " ";
-      // create a new ParseContext for the query to retrieve its operator tree,
-      // and the required GroupByOperator from it
-      ParseContext newDAGContext = RewriteParseContextGenerator.generateOperatorTree(
-          rewriteQueryCtx.getParseContext().getConf(), selReplacementCommand);
-
-      // we get our new GroupByOperator here
-      Map<GroupByOperator, Set<String>> newGbyOpMap = newDAGContext.getGroupOpToInputTables();
-      GroupByOperator newGbyOperator = newGbyOpMap.keySet().iterator().next();
-      GroupByDesc oldConf = operator.getConf();
-
-      // we need this information to set the correct colList, outputColumnNames
-      // in SelectOperator
-      ExprNodeColumnDesc aggrExprNode = null;
-
-      // Construct the new AggregationDesc to get rid of the current
-      // internal names and replace them with new internal names
-      // as required by the operator tree
-      GroupByDesc newConf = newGbyOperator.getConf();
-      List<AggregationDesc> newAggrList = newConf.getAggregators();
-      if (newAggrList != null && newAggrList.size() > 0) {
-        for (AggregationDesc aggregationDesc : newAggrList) {
-          rewriteQueryCtx.setEval(aggregationDesc.getGenericUDAFEvaluator());
-          aggrExprNode = (ExprNodeColumnDesc) aggregationDesc.getParameters().get(0);
-          rewriteQueryCtx.setAggrExprNode(aggrExprNode);
-        }
-      }
-
-      // Now the GroupByOperator has the new AggregationList;
-      // sum(`_count_of_indexed_key`)
-      // instead of count(indexed_key)
-      OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator);
-      RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver();
-      gbyOPC.setRowResolver(gbyRR);
-      rewriteQueryCtx.getOpc().put(operator, gbyOPC);
-
-      oldConf.setAggregators((ArrayList<AggregationDesc>) newAggrList);
-      operator.setConf(oldConf);
-
-    } else {
-      // we just need to reset the GenericUDAFEvaluator and its name for this
-      // GroupByOperator whose parent is the ReduceSinkOperator
-      GroupByDesc childConf = (GroupByDesc) operator.getConf();
-      List<AggregationDesc> childAggrList = childConf.getAggregators();
-      if (childAggrList != null && childAggrList.size() > 0) {
-        for (AggregationDesc aggregationDesc : childAggrList) {
-          List<ExprNodeDesc> paraList = aggregationDesc.getParameters();
-          List<ObjectInspector> parametersOIList = new ArrayList<ObjectInspector>();
-          for (ExprNodeDesc expr : paraList) {
-            parametersOIList.add(expr.getWritableObjectInspector());
-          }
-          GenericUDAFEvaluator evaluator = FunctionRegistry.getGenericUDAFEvaluator("sum",
-              parametersOIList, false, false);
-          aggregationDesc.setGenericUDAFEvaluator(evaluator);
-          aggregationDesc.setGenericUDAFName("sum");
-        }
-      }
-    }
+  public Set<String> getColumns() {
+    return columns;
   }
 }

Modified: hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out Mon Dec  8 05:08:25 2014
@@ -1189,14 +1189,14 @@ STAGE PLANS:
     Map Reduce
       Map Operator Tree:
           TableScan
-            alias: default.default__tbl_tbl_key_idx__
+            alias: tbl
             Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
             Select Operator
-              expressions: key (type: int), _count_of_key (type: bigint)
-              outputColumnNames: key, _count_of_key
+              expressions: key (type: int)
+              outputColumnNames: key
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Group By Operator
-                aggregations: sum(_count_of_key)
+                aggregations: count(key)
                 mode: hash
                 outputColumnNames: _col0
                 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
@@ -1206,7 +1206,7 @@ STAGE PLANS:
                   value expressions: _col0 (type: bigint)
       Reduce Operator Tree:
         Group By Operator
-          aggregations: sum(VALUE._col0)
+          aggregations: count(VALUE._col0)
           mode: mergepartial
           outputColumnNames: _col0
           Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE