You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/08/10 03:15:38 UTC

svn commit: r1617039 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/index/ java/org/apache/hadoop/hive/ql/optimizer/ java/org/apache/hadoop/hive/ql/optimizer/index/ java/org/apache/hadoop/hive/ql/optimizer/physical/index/ test/results/clientpo...

Author: hashutosh
Date: Sun Aug 10 01:15:37 2014
New Revision: 1617039

URL: http://svn.apache.org/r1617039
Log:
HIVE-7415 : Test TestMinimrCliDriver.testCliDriver_ql_rewrite_gbtoidx failing (Navis via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
    hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java Sun Aug 10 01:15:37 2014
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Index;
@@ -46,12 +45,9 @@ import org.apache.hadoop.hive.ql.plan.Pa
  */
 public class AggregateIndexHandler extends CompactIndexHandler {
 
-  private static Index index = null;
-
     @Override
-    public void analyzeIndexDefinition(Table baseTable, Index idx,
+    public void analyzeIndexDefinition(Table baseTable, Index index,
         Table indexTable) throws HiveException {
-      index = idx;
       StorageDescriptor storageDesc = index.getSd();
       if (this.usesIndexTable() && indexTable != null) {
         StorageDescriptor indexTableSd = storageDesc.deepCopy();
@@ -92,10 +88,11 @@ public class AggregateIndexHandler exten
     @Override
     protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs,
         Set<WriteEntity> outputs,
-        List<FieldSchema> indexField, boolean partitioned,
+        Index index, boolean partitioned,
         PartitionDesc indexTblPartDesc, String indexTableName,
         PartitionDesc baseTablePartDesc, String baseTableName, String dbName) {
 
+      List<FieldSchema> indexField = index.getSd().getCols();
       String indexCols = HiveUtils.getUnparsedColumnNamesFromFieldSchema(indexField);
 
       //form a new insert overwrite query.

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java Sun Aug 10 01:15:37 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -64,7 +65,7 @@ public abstract class TableBasedIndexHan
       if (!baseTbl.isPartitioned()) {
         // the table does not have any partition, then create index for the
         // whole table
-        Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), false,
+        Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, false,
             new PartitionDesc(desc, null), indexTbl.getTableName(),
             new PartitionDesc(Utilities.getTableDesc(baseTbl), null),
             baseTbl.getTableName(), indexTbl.getDbName());
@@ -88,7 +89,7 @@ public abstract class TableBasedIndexHan
                 "Partitions of base table and index table are inconsistent.");
           }
           // for each partition, spawn a map reduce task.
-          Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(), true,
+          Task<?> indexBuilder = getIndexBuilderMapRedTask(inputs, outputs, index, true,
               new PartitionDesc(indexPart), indexTbl.getTableName(),
               new PartitionDesc(basePart), baseTbl.getTableName(), indexTbl.getDbName());
           indexBuilderTasks.add(indexBuilder);
@@ -100,10 +101,20 @@ public abstract class TableBasedIndexHan
     }
   }
 
-  abstract protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
+  protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
+      Index index, boolean partitioned,
+      PartitionDesc indexTblPartDesc, String indexTableName,
+      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+    return getIndexBuilderMapRedTask(inputs, outputs, index.getSd().getCols(),
+        partitioned, indexTblPartDesc, indexTableName, baseTablePartDesc, baseTableName, dbName);
+  }
+
+  protected Task<?> getIndexBuilderMapRedTask(Set<ReadEntity> inputs, Set<WriteEntity> outputs,
       List<FieldSchema> indexField, boolean partitioned,
       PartitionDesc indexTblPartDesc, String indexTableName,
-      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException;
+      PartitionDesc baseTablePartDesc, String baseTableName, String dbName) throws HiveException {
+    return null;
+  }
 
   protected void setStatsDir(HiveConf builderConf) {
     String statsDir;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Sun Aug 10 01:15:37 2014
@@ -372,52 +372,57 @@ public final class ColumnPrunerProcFacto
       
       cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
           cols);
-      List<Integer> neededColumnIds = new ArrayList<Integer>();
-      List<String> neededColumnNames = new ArrayList<String>();
-      List<String> referencedColumnNames = new ArrayList<String>();
       RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
-      TableScanDesc desc = scanOp.getConf();
-      List<VirtualColumn> virtualCols = desc.getVirtualCols();
-      List<VirtualColumn> newVirtualCols = new ArrayList<VirtualColumn>();
-
-      // add virtual columns for ANALYZE TABLE
-      if(scanOp.getConf().isGatherStats()) {
-        cols.add(VirtualColumn.RAWDATASIZE.getName());
-      }
+      setupNeededColumns(scanOp, inputRR, cols);
+      return null;
+    }
+  }
 
-      for (String column : cols) {
-        String[] tabCol = inputRR.reverseLookup(column);
-        if (tabCol == null) {
-          continue;
-        }
-        referencedColumnNames.add(column);
-        ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]);
-        if (colInfo.getIsVirtualCol()) {
-          // part is also a virtual column, but part col should not in this
-          // list.
-          for (int j = 0; j < virtualCols.size(); j++) {
-            VirtualColumn vc = virtualCols.get(j);
-            if (vc.getName().equals(colInfo.getInternalName())) {
-              newVirtualCols.add(vc);
-            }
+  public static void setupNeededColumns(TableScanOperator scanOp, RowResolver inputRR,
+      List<String> cols) throws SemanticException {
+    List<Integer> neededColumnIds = new ArrayList<Integer>();
+    List<String> neededColumnNames = new ArrayList<String>();
+    List<String> referencedColumnNames = new ArrayList<String>();
+    TableScanDesc desc = scanOp.getConf();
+    List<VirtualColumn> virtualCols = desc.getVirtualCols();
+    List<VirtualColumn> newVirtualCols = new ArrayList<VirtualColumn>();
+
+    // add virtual columns for ANALYZE TABLE
+    if(scanOp.getConf().isGatherStats()) {
+      cols.add(VirtualColumn.RAWDATASIZE.getName());
+    }
+
+    for (String column : cols) {
+      String[] tabCol = inputRR.reverseLookup(column);
+      if (tabCol == null) {
+        continue;
+      }
+      referencedColumnNames.add(column);
+      ColumnInfo colInfo = inputRR.get(tabCol[0], tabCol[1]);
+      if (colInfo.getIsVirtualCol()) {
+        // part is also a virtual column, but part col should not in this
+        // list.
+        for (int j = 0; j < virtualCols.size(); j++) {
+          VirtualColumn vc = virtualCols.get(j);
+          if (vc.getName().equals(colInfo.getInternalName())) {
+            newVirtualCols.add(vc);
           }
-          //no need to pass virtual columns to reader.
-          continue;
-        }
-        int position = inputRR.getPosition(column);
-        if (position >= 0) {
-          // get the needed columns by id and name
-          neededColumnIds.add(position);
-          neededColumnNames.add(column);
         }
+        //no need to pass virtual columns to reader.
+        continue;
+      }
+      int position = inputRR.getPosition(column);
+      if (position >= 0) {
+        // get the needed columns by id and name
+        neededColumnIds.add(position);
+        neededColumnNames.add(column);
       }
-
-      desc.setVirtualCols(newVirtualCols);
-      scanOp.setNeededColumnIDs(neededColumnIds);
-      scanOp.setNeededColumns(neededColumnNames);
-      scanOp.setReferencedColumns(referencedColumnNames);
-      return null;
     }
+
+    desc.setVirtualCols(newVirtualCols);
+    scanOp.setNeededColumnIDs(neededColumnIds);
+    scanOp.setNeededColumns(neededColumnNames);
+    scanOp.setReferencedColumns(referencedColumnNames);
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/IndexUtils.java Sun Aug 10 01:15:37 2014
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -57,7 +56,6 @@ import org.apache.hadoop.hive.ql.parse.S
 public final class IndexUtils {
 
   private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName());
-  private static final Map<Index, Table> indexToIndexTable = new HashMap<Index, Table>();
 
   private IndexUtils(){
   }
@@ -71,9 +69,7 @@ public final class IndexUtils {
    * @throws HiveException
    */
   public static Set<Partition> checkPartitionsCoveredByIndex(TableScanOperator tableScan,
-      ParseContext pctx,
-      Map<Table, List<Index>> indexes)
-    throws HiveException {
+      ParseContext pctx, List<Index> indexes) throws HiveException {
     Hive hive = Hive.get(pctx.getConf());
     // make sure each partition exists on the index table
     PrunedPartitionList queryPartitionList = pctx.getOpToPartList().get(tableScan);
@@ -83,7 +79,6 @@ public final class IndexUtils {
     }
 
     for (Partition part : queryPartitions) {
-      List<Table> sourceIndexTables = getIndexTables(hive, part, indexes);
       if (!containsPartition(hive, part, indexes)) {
         return null; // problem if it doesn't contain the partition
       }
@@ -93,63 +88,21 @@ public final class IndexUtils {
   }
 
   /**
-   * return index tables associated with a given base table
-   */
-  private List<Table> getIndexTables(Hive hive, Table table,
-      Map<Table, List<Index>> indexes) throws
-    HiveException {
-    List<Table> indexTables = new ArrayList<Table>();
-    if (indexes == null || indexes.get(table) == null) {
-      return indexTables;
-    }
-    for (Index index : indexes.get(table)) {
-      Table indexTable = hive.getTable(index.getIndexTableName());
-      indexToIndexTable.put(index, indexTable);
-      indexTables.add(indexTable);
-    }
-    return indexTables;
-  }
-
-  /**
-   * return index tables associated with the base table of the partition
-   */
-  private static List<Table> getIndexTables(Hive hive, Partition part,
-      Map<Table, List<Index>> indexes) throws HiveException {
-    List<Table> indexTables = new ArrayList<Table>();
-    Table partitionedTable = part.getTable();
-    if (indexes == null || indexes.get(partitionedTable) == null) {
-      return indexTables;
-    }
-    for (Index index : indexes.get(partitionedTable)) {
-      Table indexTable = hive.getTable(index.getIndexTableName());
-      indexToIndexTable.put(index, indexTable);
-      indexTables.add(indexTable);
-    }
-    return indexTables;
-  }
-
-  /**
    * check that every index table contains the given partition and is fresh
    */
-  private static boolean containsPartition(Hive hive, Partition part,
-      Map<Table, List<Index>> indexes)
-    throws HiveException {
+  private static boolean containsPartition(Hive hive, Partition part, List<Index> indexes)
+      throws HiveException {
     HashMap<String, String> partSpec = part.getSpec();
-
-    if (indexes == null || indexes.get(part.getTable()) == null) {
-      return false;
-    }
-
     if (partSpec.isEmpty()) {
       // empty specs come from non-partitioned tables
-      return isIndexTableFresh(hive, indexes.get(part.getTable()), part.getTable());
+      return isIndexTableFresh(hive, indexes, part.getTable());
     }
 
-    for (Index index : indexes.get(part.getTable())) {
-      Table indexTable = indexToIndexTable.get(index);
+    for (Index index : indexes) {
+      Table indexTable = hive.getTable(index.getIndexTableName());
       // get partitions that match the spec
-      List<Partition> matchingPartitions = hive.getPartitions(indexTable, partSpec);
-      if (matchingPartitions == null || matchingPartitions.size() == 0) {
+      Partition matchingPartition = hive.getPartition(indexTable, partSpec, false);
+      if (matchingPartition == null) {
         LOG.info("Index table " + indexTable + "did not contain built partition that matched " + partSpec);
         return false;
       } else if (!isIndexPartitionFresh(hive, index, part)) {
@@ -160,7 +113,7 @@ public final class IndexUtils {
   }
 
   /**
-   * Check the index partitions on a parttioned table exist and are fresh
+   * Check the index partitions on a partitioned table exist and are fresh
    */
   private static boolean isIndexPartitionFresh(Hive hive, Index index,
       Partition part) throws HiveException {
@@ -187,7 +140,7 @@ public final class IndexUtils {
   }
 
   /**
-   * Check that the indexes on the unpartioned table exist and are fresh
+   * Check that the indexes on the un-partitioned table exist and are fresh
    */
   private static boolean isIndexTableFresh(Hive hive, List<Index> indexes, Table src)
     throws HiveException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java Sun Aug 10 01:15:37 2014
@@ -30,10 +30,9 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -45,7 +44,6 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
  * RewriteCanApplyCtx class stores the context for the {@link RewriteCanApplyProcFactory}
@@ -84,7 +82,9 @@ public final class RewriteCanApplyCtx im
   private Set<String> aggFuncColList = new LinkedHashSet<String>();
 
   private final ParseContext parseContext;
+  private String alias;
   private String baseTableName;
+  private String indexTableName;
   private String aggFunction;
 
   void resetCanApplyCtx(){
@@ -230,6 +230,14 @@ public final class RewriteCanApplyCtx im
     this.aggFuncCnt = aggFuncCnt;
   }
 
+  public String getAlias() {
+    return alias;
+  }
+
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+
   public String getBaseTableName() {
     return baseTableName;
   }
@@ -238,10 +246,26 @@ public final class RewriteCanApplyCtx im
     this.baseTableName = baseTableName;
   }
 
+  public String getIndexTableName() {
+    return indexTableName;
+  }
+
+  public void setIndexTableName(String indexTableName) {
+    this.indexTableName = indexTableName;
+  }
+
   public  ParseContext getParseContext() {
     return parseContext;
   }
 
+  public Set<String> getAllColumns() {
+    Set<String> allColumns = new LinkedHashSet<String>(selectColumnsList);
+    allColumns.addAll(predicateColumnsList);
+    allColumns.addAll(gbKeyNameList);
+    allColumns.addAll(aggFuncColList);
+    return allColumns;
+  }
+
 
   /**
    * This method walks all the nodes starting from topOp TableScanOperator node
@@ -255,15 +279,13 @@ public final class RewriteCanApplyCtx im
    * @param topOp
    * @throws SemanticException
    */
-  void populateRewriteVars(Operator<? extends OperatorDesc> topOp)
+  void populateRewriteVars(TableScanOperator topOp)
     throws SemanticException{
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"),
-        RewriteCanApplyProcFactory.canApplyOnFilterOperator());
+        RewriteCanApplyProcFactory.canApplyOnFilterOperator(topOp));
     opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + "%"),
-        RewriteCanApplyProcFactory.canApplyOnGroupByOperator());
-    opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + "%"),
-        RewriteCanApplyProcFactory.canApplyOnSelectOperator());
+        RewriteCanApplyProcFactory.canApplyOnGroupByOperator(topOp));
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -364,5 +386,4 @@ public final class RewriteCanApplyCtx im
     }
     return true;
   }
-
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java Sun Aug 10 01:15:37 2014
@@ -18,19 +18,9 @@
 
 package org.apache.hadoop.hive.ql.optimizer.index;
 
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -39,10 +29,13 @@ import org.apache.hadoop.hive.ql.plan.Ag
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.List;
+import java.util.Stack;
 
 /**
  * Factory of methods used by {@link RewriteGBUsingIndex}
@@ -50,43 +43,46 @@ import org.apache.hadoop.hive.ql.plan.Op
  *
  */
 public final class RewriteCanApplyProcFactory {
-  private static RewriteCanApplyCtx canApplyCtx = null;
-
-  private RewriteCanApplyProcFactory(){
-    //this prevents the class from getting instantiated
-  }
 
   /**
    * Check for conditions in FilterOperator that do not meet rewrite criteria.
    */
   private static class CheckFilterProc implements NodeProcessor {
+
+    private TableScanOperator topOp;
+
+    public CheckFilterProc(TableScanOperator topOp) {
+      this.topOp = topOp;
+    }
+
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
         Object... nodeOutputs) throws SemanticException {
       FilterOperator operator = (FilterOperator)nd;
-      canApplyCtx = (RewriteCanApplyCtx)ctx;
-      FilterDesc conf = (FilterDesc)operator.getConf();
+      RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx;
+      FilterDesc conf = operator.getConf();
       //The filter operator should have a predicate of ExprNodeGenericFuncDesc type.
       //This represents the comparison operator
-      ExprNodeGenericFuncDesc oldengfd = (ExprNodeGenericFuncDesc) conf.getPredicate();
+      ExprNodeDesc oldengfd = conf.getPredicate();
       if(oldengfd == null){
         canApplyCtx.setWhrClauseColsFetchException(true);
+        return null;
       }
-      //The predicate should have valid left and right columns
-      List<String> colList = oldengfd.getCols();
-      if(colList == null || colList.size() == 0){
+      ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(oldengfd, operator, topOp);
+      if (backtrack == null) {
         canApplyCtx.setWhrClauseColsFetchException(true);
+        return null;
       }
       //Add the predicate columns to RewriteCanApplyCtx's predColRefs list to check later
       //if index keys contain all filter predicate columns and vice-a-versa
-      for (String col : colList) {
+      for (String col : backtrack.getCols()) {
         canApplyCtx.getPredicateColumnsList().add(col);
       }
       return null;
     }
   }
 
- public static CheckFilterProc canApplyOnFilterOperator() {
-    return new CheckFilterProc();
+ public static CheckFilterProc canApplyOnFilterOperator(TableScanOperator topOp) {
+    return new CheckFilterProc(topOp);
   }
 
    /**
@@ -95,10 +91,16 @@ public final class RewriteCanApplyProcFa
    */
   private static class CheckGroupByProc implements NodeProcessor {
 
+     private TableScanOperator topOp;
+
+     public CheckGroupByProc(TableScanOperator topOp) {
+       this.topOp = topOp;
+     }
+
      public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
          Object... nodeOutputs) throws SemanticException {
        GroupByOperator operator = (GroupByOperator)nd;
-       canApplyCtx = (RewriteCanApplyCtx)ctx;
+       RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx;
        //for each group-by clause in query, only one GroupByOperator of the
        //GBY-RS-GBY sequence is stored in  getGroupOpToInputTables
        //we need to process only this operator
@@ -107,7 +109,7 @@ public final class RewriteCanApplyProcFa
            !canApplyCtx.isQueryHasGroupBy()){
 
          canApplyCtx.setQueryHasGroupBy(true);
-         GroupByDesc conf = (GroupByDesc) operator.getConf();
+         GroupByDesc conf = operator.getConf();
          List<AggregationDesc> aggrList = conf.getAggregators();
          if(aggrList != null && aggrList.size() > 0){
              for (AggregationDesc aggregationDesc : aggrList) {
@@ -119,40 +121,39 @@ public final class RewriteCanApplyProcFa
                String aggFunc = aggregationDesc.getGenericUDAFName();
                if(!("count".equals(aggFunc))){
                  canApplyCtx.setAggFuncIsNotCount(true);
-               }else{
-                List<ExprNodeDesc> para = aggregationDesc.getParameters();
-                //for a valid aggregation, it needs to have non-null parameter list
-                 if(para == null){
-                   canApplyCtx.setAggFuncColsFetchException(true);
-                 }else if(para.size() == 0){
-                   //count(*) case
-                   canApplyCtx.setCountOnAllCols(true);
-                   canApplyCtx.setAggFunction("_count_of_all");
-                 }else{
-                   assert para.size()==1;
-                   for(int i=0; i< para.size(); i++){
-                     ExprNodeDesc expr = para.get(i);
-                     if(expr instanceof ExprNodeColumnDesc){
-                       //Add the columns to RewriteCanApplyCtx's selectColumnsList list
-                       //to check later if index keys contain all select clause columns
-                       //and vice-a-versa. We get the select column 'actual' names only here
-                       //if we have a agg func along with group-by
-                       //SelectOperator has internal names in its colList data structure
-                       canApplyCtx.getSelectColumnsList().add(
-                           ((ExprNodeColumnDesc) expr).getColumn());
-                       //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later
-                       //if columns contained in agg func are index key columns
-                       canApplyCtx.getAggFuncColList().add(
-                           ((ExprNodeColumnDesc) expr).getColumn());
-                       canApplyCtx.setAggFunction("_count_of_" +
-                           ((ExprNodeColumnDesc) expr).getColumn() + "");
-                     }else if(expr instanceof ExprNodeConstantDesc){
-                       //count(1) case
-                       canApplyCtx.setCountOfOne(true);
-                       canApplyCtx.setAggFunction("_count_of_1");
-                     }
-                   }
+                 return false;
+               }
+               List<ExprNodeDesc> para = aggregationDesc.getParameters();
+               //for a valid aggregation, it needs to have non-null parameter list
+               if (para == null) {
+                 canApplyCtx.setAggFuncColsFetchException(true);
+               } else if (para.size() == 0) {
+                 //count(*) case
+                 canApplyCtx.setCountOnAllCols(true);
+                 canApplyCtx.setAggFunction("_count_of_all");
+               } else if (para.size() == 1) {
+                 ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator, topOp);
+                 if (expr instanceof ExprNodeColumnDesc){
+                   //Add the columns to RewriteCanApplyCtx's selectColumnsList list
+                   //to check later if index keys contain all select clause columns
+                   //and vice-a-versa. We get the select column 'actual' names only here
+                   //if we have a agg func along with group-by
+                   //SelectOperator has internal names in its colList data structure
+                   canApplyCtx.getSelectColumnsList().add(
+                       ((ExprNodeColumnDesc) expr).getColumn());
+                   //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later
+                   //if columns contained in agg func are index key columns
+                   canApplyCtx.getAggFuncColList().add(
+                       ((ExprNodeColumnDesc) expr).getColumn());
+                   canApplyCtx.setAggFunction("_count_of_" +
+                       ((ExprNodeColumnDesc) expr).getColumn() + "");
+                 } else if(expr instanceof ExprNodeConstantDesc) {
+                   //count(1) case
+                   canApplyCtx.setCountOfOne(true);
+                   canApplyCtx.setAggFunction("_count_of_1");
                  }
+               } else {
+                 throw new SemanticException("Invalid number of arguments for count");
                }
              }
          }
@@ -163,13 +164,13 @@ public final class RewriteCanApplyProcFa
            canApplyCtx.setGbyKeysFetchException(true);
          }
          for (ExprNodeDesc expr : keyList) {
-           checkExpression(expr);
+           checkExpression(canApplyCtx, expr);
          }
        }
        return null;
      }
 
-     private void checkExpression(ExprNodeDesc expr){
+     private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr){
        if(expr instanceof ExprNodeColumnDesc){
          //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later
          //if all keys are from index columns
@@ -182,59 +183,14 @@ public final class RewriteCanApplyProcFa
              canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
              canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn());
            }else if(childExpr instanceof ExprNodeGenericFuncDesc){
-             checkExpression(childExpr);
+             checkExpression(canApplyCtx, childExpr);
            }
          }
        }
      }
    }
 
-
-   public static CheckGroupByProc canApplyOnGroupByOperator() {
-     return new CheckGroupByProc();
+   public static CheckGroupByProc canApplyOnGroupByOperator(TableScanOperator topOp) {
+     return new CheckGroupByProc(topOp);
    }
-
-
- /**
-   * Check for conditions in SelectOperator that do not meet rewrite criteria.
-   */
-  private static class CheckSelectProc implements NodeProcessor {
-     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
-         Object... nodeOutputs) throws SemanticException {
-       SelectOperator operator = (SelectOperator)nd;
-       canApplyCtx = (RewriteCanApplyCtx)ctx;
-
-       List<Operator<? extends OperatorDesc>> childrenList = operator.getChildOperators();
-       Operator<? extends OperatorDesc> child = childrenList.get(0);
-       if(child instanceof FileSinkOperator){
-         Map<String, String> internalToAlias = new LinkedHashMap<String, String>();
-         RowSchema rs = operator.getSchema();
-         //to get the internal to alias mapping
-         List<ColumnInfo> sign = rs.getSignature();
-         for (ColumnInfo columnInfo : sign) {
-           internalToAlias.put(columnInfo.getInternalName(), columnInfo.getAlias());
-         }
-
-         //if FilterOperator predicate has internal column names,
-         //we need to retrieve the 'actual' column names to
-         //check if index keys contain all filter predicate columns and vice-a-versa
-         Iterator<String> predItr = canApplyCtx.getPredicateColumnsList().iterator();
-         while(predItr.hasNext()){
-           String predCol = predItr.next();
-           String newPredCol = "";
-           if(internalToAlias.get(predCol) != null){
-             newPredCol = internalToAlias.get(predCol);
-             canApplyCtx.getPredicateColumnsList().remove(predCol);
-             canApplyCtx.getPredicateColumnsList().add(newPredCol);
-           }
-         }
-       }
-       return null;
-     }
-   }
-
-   public static CheckSelectProc canApplyOnSelectOperator() {
-     return new CheckSelectProc();
-   }
-
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Sun Aug 10 01:15:37 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.util.StringUtils;
 
 
 /**
@@ -106,11 +107,6 @@ public class RewriteGBUsingIndex impleme
   private final Map<String, RewriteCanApplyCtx> tsOpToProcess =
     new LinkedHashMap<String, RewriteCanApplyCtx>();
 
-  //Name of the current table on which rewrite is being performed
-  private String baseTableName = null;
-  //Name of the current index which is used for rewrite
-  private String indexTableName = null;
-
   //Index Validation Variables
   private static final String IDX_BUCKET_COL = "_bucketname";
   private static final String IDX_OFFSETS_ARRAY_COL = "_offsets";
@@ -133,7 +129,7 @@ public class RewriteGBUsingIndex impleme
     /* Check if the input query passes all the tests to be eligible for a rewrite
      * If yes, rewrite original query; else, return the current parseContext
      */
-    if(shouldApplyOptimization()){
+    if (shouldApplyOptimization()) {
       LOG.info("Rewriting Original Query using " + getName() + " optimization.");
       rewriteOriginalQuery();
     }
@@ -155,59 +151,52 @@ public class RewriteGBUsingIndex impleme
    * @return
    * @throws SemanticException
    */
-  boolean shouldApplyOptimization() throws SemanticException{
-    boolean canApply = false;
-    if(ifQueryHasMultipleTables()){
+  boolean shouldApplyOptimization() throws SemanticException {
+    if (ifQueryHasMultipleTables()) {
       //We do not apply this optimization for this case as of now.
       return false;
-    }else{
+    }
+    Map<Table, List<Index>> tableToIndex = getIndexesForRewrite();
+    if (tableToIndex.isEmpty()) {
+      LOG.debug("No Valid Index Found to apply Rewrite, " +
+          "skipping " + getName() + " optimization");
+      return false;
+    }
     /*
      * This code iterates over each TableScanOperator from the topOps map from ParseContext.
      * For each operator tree originating from this top TableScanOperator, we determine
      * if the optimization can be applied. If yes, we add the name of the top table to
      * the tsOpToProcess to apply rewrite later on.
      * */
-      Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
-      Iterator<TableScanOperator> topOpItr = topToTable.keySet().iterator();
-      while(topOpItr.hasNext()){
-
-        TableScanOperator topOp = topOpItr.next();
-        Table table = topToTable.get(topOp);
-        baseTableName = table.getTableName();
-        Map<Table, List<Index>> indexes = getIndexesForRewrite();
-        if(indexes == null){
-          LOG.debug("Error getting valid indexes for rewrite, " +
-              "skipping " + getName() + " optimization");
-          return false;
-        }
+    Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+    Map<String, Operator<?>> topOps = parseContext.getTopOps();
+
+    for (Map.Entry<String, Operator<?>> entry : parseContext.getTopOps().entrySet()) {
 
-        if(indexes.size() == 0){
-          LOG.debug("No Valid Index Found to apply Rewrite, " +
+      String alias = entry.getKey();
+      TableScanOperator topOp = (TableScanOperator) entry.getValue();
+
+      Table table = topToTable.get(topOp);
+      List<Index> indexes = tableToIndex.get(table);
+      if (indexes.isEmpty()) {
+        continue;
+      }
+
+      if (table.isPartitioned()) {
+        //if base table has partitions, we need to check if index is built for
+        //all partitions. If not, then we do not apply the optimization
+        if (!checkIfIndexBuiltOnAllTablePartitions(topOp, indexes)) {
+          LOG.debug("Index is not built for all table partitions, " +
               "skipping " + getName() + " optimization");
-          return false;
-        }else{
-          //we need to check if the base table has confirmed or unknown partitions
-          if(parseContext.getOpToPartList() != null && parseContext.getOpToPartList().size() > 0){
-            //if base table has partitions, we need to check if index is built for
-            //all partitions. If not, then we do not apply the optimization
-            if(checkIfIndexBuiltOnAllTablePartitions(topOp, indexes)){
-              //check if rewrite can be applied for operator tree
-              //if partitions condition returns true
-              canApply = checkIfRewriteCanBeApplied(topOp, table, indexes);
-            }else{
-              LOG.debug("Index is not built for all table partitions, " +
-                  "skipping " + getName() + " optimization");
-              return false;
-            }
-          }else{
-            //check if rewrite can be applied for operator tree
-            //if there are no partitions on base table
-            canApply = checkIfRewriteCanBeApplied(topOp, table, indexes);
-          }
+          continue;
         }
       }
+      //check if rewrite can be applied for operator tree
+      //if there are no partitions on base table
+      checkIfRewriteCanBeApplied(alias, topOp, table, indexes);
     }
-    return canApply;
+
+    return !tsOpToProcess.isEmpty();
   }
 
   /**
@@ -219,61 +208,36 @@ public class RewriteGBUsingIndex impleme
    * @return - true if rewrite can be applied on the current branch; false otherwise
    * @throws SemanticException
    */
-  private boolean checkIfRewriteCanBeApplied(TableScanOperator topOp, Table baseTable,
-      Map<Table, List<Index>> indexes) throws SemanticException{
-    boolean canApply = false;
+  private boolean checkIfRewriteCanBeApplied(String alias, TableScanOperator topOp,
+      Table baseTable, List<Index> indexes) throws SemanticException{
     //Context for checking if this optimization can be applied to the input query
     RewriteCanApplyCtx canApplyCtx = RewriteCanApplyCtx.getInstance(parseContext);
-    Map<String, Operator<? extends OperatorDesc>> topOps = parseContext.getTopOps();
 
-    canApplyCtx.setBaseTableName(baseTableName);
+    canApplyCtx.setAlias(alias);
+    canApplyCtx.setBaseTableName(baseTable.getTableName());
     canApplyCtx.populateRewriteVars(topOp);
 
-    Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes.get(baseTable));
-    Iterator<Index> indexMapItr = indexTableMap.keySet().iterator();
-    Index index = null;
-    while(indexMapItr.hasNext()){
+    Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes);
+    for (Map.Entry<Index, Set<String>> entry : indexTableMap.entrySet()) {
       //we rewrite the original query using the first valid index encountered
       //this can be changed if we have a better mechanism to
       //decide which index will produce a better rewrite
-      index = indexMapItr.next();
-      canApply = canApplyCtx.isIndexUsableForQueryBranchRewrite(index,
-          indexTableMap.get(index));
-      if(canApply){
-        canApply = checkIfAllRewriteCriteriaIsMet(canApplyCtx);
-        //break here if any valid index is found to apply rewrite
-        if(canApply){
-          //check if aggregation function is set.
-          //If not, set it using the only indexed column
-          if(canApplyCtx.getAggFunction() == null){
-            //strip of the start and end braces [...]
-            String aggregationFunction = indexTableMap.get(index).toString();
-            aggregationFunction = aggregationFunction.substring(1,
-                aggregationFunction.length() - 1);
-            canApplyCtx.setAggFunction("_count_of_" + aggregationFunction + "");
-          }
+      Index index = entry.getKey();
+      Set<String> indexKeyNames = entry.getValue();
+      //break here if any valid index is found to apply rewrite
+      if (canApplyCtx.isIndexUsableForQueryBranchRewrite(index, indexKeyNames) &&
+          checkIfAllRewriteCriteriaIsMet(canApplyCtx)) {
+        //check if aggregation function is set.
+        //If not, set it using the only indexed column
+        if (canApplyCtx.getAggFunction() == null) {
+          canApplyCtx.setAggFunction("_count_of_" + StringUtils.join(",", indexKeyNames) + "");
         }
-        break;
+        canApplyCtx.setIndexTableName(index.getIndexTableName());
+        tsOpToProcess.put(alias, canApplyCtx);
+        return true;
       }
     }
-    indexTableName = index.getIndexTableName();
-
-    if(canApply && topOps.containsValue(topOp)) {
-      Iterator<String> topOpNamesItr = topOps.keySet().iterator();
-      while(topOpNamesItr.hasNext()){
-        String topOpName = topOpNamesItr.next();
-        if(topOps.get(topOpName).equals(topOp)){
-          tsOpToProcess.put(topOpName, canApplyCtx);
-        }
-      }
-    }
-
-    if(tsOpToProcess.size() == 0){
-      canApply = false;
-    }else{
-      canApply = true;
-    }
-    return canApply;
+    return false;
   }
 
   /**
@@ -329,7 +293,7 @@ public class RewriteGBUsingIndex impleme
    * @throws SemanticException
    */
   private boolean checkIfIndexBuiltOnAllTablePartitions(TableScanOperator tableScan,
-      Map<Table, List<Index>> indexes) throws SemanticException{
+      List<Index> indexes) throws SemanticException {
     // check if we have indexes on all partitions in this table scan
     Set<Partition> queryPartitions;
     try {
@@ -341,7 +305,7 @@ public class RewriteGBUsingIndex impleme
       LOG.error("Fatal Error: problem accessing metastore", e);
       throw new SemanticException(e);
     }
-    if(queryPartitions.size() != 0){
+    if (queryPartitions.size() != 0) {
       return true;
     }
     return false;
@@ -355,12 +319,11 @@ public class RewriteGBUsingIndex impleme
    * @throws SemanticException
    */
   Map<Index, Set<String>> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{
-    Index index = null;
     Hive hiveInstance = hiveDb;
     Map<Index, Set<String>> indexToKeysMap = new LinkedHashMap<Index, Set<String>>();
      for (int idxCtr = 0; idxCtr < indexTables.size(); idxCtr++)  {
       final Set<String> indexKeyNames = new LinkedHashSet<String>();
-      index = indexTables.get(idxCtr);
+      Index index = indexTables.get(idxCtr);
        //Getting index key columns
       StorageDescriptor sd = index.getSd();
       List<FieldSchema> idxColList = sd.getCols();
@@ -403,17 +366,17 @@ public class RewriteGBUsingIndex impleme
    */
   @SuppressWarnings("unchecked")
   private void rewriteOriginalQuery() throws SemanticException {
-    Map<String, Operator<? extends OperatorDesc>> topOpMap =
-      (HashMap<String, Operator<? extends OperatorDesc>>) parseContext.getTopOps().clone();
+    Map<String, Operator<?>> topOpMap = parseContext.getTopOps();
     Iterator<String> tsOpItr = tsOpToProcess.keySet().iterator();
 
-    while(tsOpItr.hasNext()){
-      baseTableName = tsOpItr.next();
-      RewriteCanApplyCtx canApplyCtx = tsOpToProcess.get(baseTableName);
-      TableScanOperator topOp = (TableScanOperator) topOpMap.get(baseTableName);
+    for (Map.Entry<String, RewriteCanApplyCtx> entry : tsOpToProcess.entrySet()) {
+      String alias = entry.getKey();
+      RewriteCanApplyCtx canApplyCtx = entry.getValue();
+      TableScanOperator topOp = (TableScanOperator) topOpMap.get(alias);
       RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx =
         RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb,
-            indexTableName, baseTableName, canApplyCtx.getAggFunction());
+            canApplyCtx.getIndexTableName(), canApplyCtx.getAlias(),
+            canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction());
       rewriteQueryCtx.invokeRewriteQueryProc(topOp);
       parseContext = rewriteQueryCtx.getParseContext();
       parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends OperatorDesc>,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java Sun Aug 10 01:15:37 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
@@ -68,7 +69,6 @@ import org.apache.hadoop.hive.serde2.typ
  */
 public final class RewriteQueryUsingAggregateIndex {
   private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndex.class.getName());
-  private static RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null;
 
   private RewriteQueryUsingAggregateIndex() {
     //this prevents the class from getting instantiated
@@ -78,7 +78,7 @@ public final class RewriteQueryUsingAggr
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
         Object... nodeOutputs) throws SemanticException {
       SelectOperator operator = (SelectOperator)nd;
-      rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
+      RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
       List<Operator<? extends OperatorDesc>> childOps = operator.getChildOperators();
       Operator<? extends OperatorDesc> childOp = childOps.iterator().next();
 
@@ -98,7 +98,7 @@ public final class RewriteQueryUsingAggr
         List<ColumnInfo> selRSSignature =
           selRS.getSignature();
         //Need to create a new type for Column[_count_of_indexed_key_column] node
-        PrimitiveTypeInfo pti = (PrimitiveTypeInfo) TypeInfoFactory.getPrimitiveTypeInfo("bigint");
+        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo("bigint");
         pti.setTypeName("bigint");
         ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false);
         selRSSignature.add(newCI);
@@ -117,19 +117,15 @@ public final class RewriteQueryUsingAggr
   /**
    * This processor replaces the original TableScanOperator with
    * the new TableScanOperator and metadata that scans over the
-   * index table rather than scanning over the orginal table.
+   * index table rather than scanning over the original table.
    *
    */
   private static class ReplaceTableScanOpProc implements NodeProcessor {
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
         Object... nodeOutputs) throws SemanticException {
       TableScanOperator scanOperator = (TableScanOperator)nd;
-      rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
-      String baseTableName = rewriteQueryCtx.getBaseTableName();
-      String alias = null;
-      if(baseTableName.contains(":")){
-        alias = (baseTableName.split(":"))[0];
-      }
+      RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
+      String alias = rewriteQueryCtx.getAlias();
 
       //Need to remove the original TableScanOperators from these data structures
       // and add new ones
@@ -144,8 +140,8 @@ public final class RewriteQueryUsingAggr
       OpParseContext operatorContext = opParseContext.get(scanOperator);
 
       //remove original TableScanOperator
+      topOps.remove(alias);
       topToTable.remove(scanOperator);
-      topOps.remove(baseTableName);
       opParseContext.remove(scanOperator);
 
       //construct a new descriptor for the index table scan
@@ -171,13 +167,11 @@ public final class RewriteQueryUsingAggr
       try {
         StructObjectInspector rowObjectInspector =
           (StructObjectInspector) indexTableHandle.getDeserializer().getObjectInspector();
-        List<? extends StructField> fields = rowObjectInspector
-        .getAllStructFieldRefs();
-        for (int i = 0; i < fields.size(); i++) {
-          rr.put(indexTableName, fields.get(i).getFieldName(), new ColumnInfo(fields
-              .get(i).getFieldName(), TypeInfoUtils
-              .getTypeInfoFromObjectInspector(fields.get(i)
-                  .getFieldObjectInspector()), indexTableName, false));
+        for (String column : rewriteQueryCtx.getColumns()) {
+          StructField field = rowObjectInspector.getStructFieldRef(column);
+          rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(),
+              TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()),
+              indexTableName, false));
         }
       } catch (SerDeException e) {
         LOG.error("Error while creating the RowResolver for new TableScanOperator.");
@@ -187,18 +181,18 @@ public final class RewriteQueryUsingAggr
 
       //Set row resolver for new table
       operatorContext.setRowResolver(rr);
-      String tabNameWithAlias = null;
-      if(alias != null){
-        tabNameWithAlias = alias + ":" + indexTableName;
-       }else{
-         tabNameWithAlias = indexTableName;
-       }
+
+      String newAlias = indexTableName;
+      int index = alias.lastIndexOf(":");
+      if (index >= 0) {
+        newAlias = alias.substring(0, index) + ":" + indexTableName;
+      }
 
       //Scan operator now points to other table
       topToTable.put(scanOperator, indexTableHandle);
-      scanOperator.getConf().setAlias(tabNameWithAlias);
+      scanOperator.getConf().setAlias(newAlias);
       scanOperator.setAlias(indexTableName);
-      topOps.put(tabNameWithAlias, scanOperator);
+      topOps.put(newAlias, scanOperator);
       opParseContext.put(scanOperator, operatorContext);
       rewriteQueryCtx.getParseContext().setTopToTable(
         (HashMap<TableScanOperator, Table>) topToTable);
@@ -207,6 +201,9 @@ public final class RewriteQueryUsingAggr
       rewriteQueryCtx.getParseContext().setOpParseCtx(
         (LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>) opParseContext);
 
+      ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr,
+          new ArrayList<String>(rewriteQueryCtx.getColumns()));
+
       return null;
     }
   }
@@ -228,7 +225,7 @@ public final class RewriteQueryUsingAggr
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
         Object... nodeOutputs) throws SemanticException {
       GroupByOperator operator = (GroupByOperator)nd;
-      rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
+      RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
 
       //We need to replace the GroupByOperator which is in
       //groupOpToInputTables map with the new GroupByOperator

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Sun Aug 10 01:15:37 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -54,19 +55,21 @@ import org.apache.hadoop.hive.ql.udf.gen
 public final class RewriteQueryUsingAggregateIndexCtx  implements NodeProcessorCtx {
 
   private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb,
-      String indexTableName, String baseTableName, String aggregateFunction){
+      String indexTableName, String alias, Set<String> columns, String aggregateFunction) {
     this.parseContext = parseContext;
     this.hiveDb = hiveDb;
     this.indexTableName = indexTableName;
-    this.baseTableName = baseTableName;
+    this.alias = alias;
     this.aggregateFunction = aggregateFunction;
+    this.columns = columns;
     this.opc = parseContext.getOpParseCtx();
   }
 
   public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext,
-      Hive hiveDb, String indexTableName, String baseTableName, String aggregateFunction){
+      Hive hiveDb, String indexTableName, String alias,
+      Set<String> columns, String aggregateFunction) {
     return new RewriteQueryUsingAggregateIndexCtx(
-        parseContext, hiveDb, indexTableName, baseTableName, aggregateFunction);
+        parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction);
   }
 
 
@@ -77,8 +80,9 @@ public final class RewriteQueryUsingAggr
   //We need the GenericUDAFEvaluator for GenericUDAF function "sum"
   private GenericUDAFEvaluator eval = null;
   private final String indexTableName;
-  private final String baseTableName;
+  private final String alias;
   private final String aggregateFunction;
+  private final Set<String> columns;
   private ExprNodeColumnDesc aggrExprNode = null;
 
   public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpc() {
@@ -161,11 +165,15 @@ public final class RewriteQueryUsingAggr
     };
   }
 
-  public String getBaseTableName() {
-    return baseTableName;
+  public String getAlias() {
+    return alias;
   }
 
   public String getAggregateFunction() {
     return aggregateFunction;
   }
+
+  public Set<String> getColumns() {
+    return columns;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java Sun Aug 10 01:15:37 2014
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -64,11 +63,11 @@ import org.apache.hadoop.hive.ql.plan.Ta
 public class IndexWhereProcessor implements NodeProcessor {
 
   private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName());
-  private final Map<Table, List<Index>> indexes;
+  private final Map<TableScanOperator, List<Index>> tsToIndices;
 
-  public IndexWhereProcessor(Map<Table, List<Index>> indexes) {
+  public IndexWhereProcessor(Map<TableScanOperator, List<Index>> tsToIndices) {
     super();
-    this.indexes = indexes;
+    this.tsToIndices = tsToIndices;
   }
 
   @Override
@@ -81,9 +80,11 @@ public class IndexWhereProcessor impleme
     TableScanOperator operator = (TableScanOperator) nd;
     List<Node> opChildren = operator.getChildren();
     TableScanDesc operatorDesc = operator.getConf();
-    if (operatorDesc == null) {
+    if (operatorDesc == null || !tsToIndices.containsKey(operator)) {
       return null;
     }
+    List<Index> indexes = tsToIndices.get(operator);
+
     ExprNodeDesc predicate = operatorDesc.getFilterExpr();
 
     IndexWhereProcCtx context = (IndexWhereProcCtx) procCtx;
@@ -96,7 +97,7 @@ public class IndexWhereProcessor impleme
     }
     LOG.info(predicate.getExprString());
 
-    // check if we have indexes on all partitions in this table scan
+    // check if we have tsToIndices on all partitions in this table scan
     Set<Partition> queryPartitions;
     try {
       queryPartitions = IndexUtils.checkPartitionsCoveredByIndex(operator, pctx, indexes);
@@ -118,14 +119,9 @@ public class IndexWhereProcessor impleme
     Map<Index, HiveIndexQueryContext> queryContexts = new HashMap<Index, HiveIndexQueryContext>();
     // make sure we have an index on the table being scanned
     TableDesc tblDesc = operator.getTableDesc();
-    Table srcTable = pctx.getTopToTable().get(operator);
-    if (indexes == null || indexes.get(srcTable) == null) {
-      return null;
-    }
 
-    List<Index> tableIndexes = indexes.get(srcTable);
     Map<String, List<Index>> indexesByType = new HashMap<String, List<Index>>();
-    for (Index indexOnTable : tableIndexes) {
+    for (Index indexOnTable : indexes) {
       if (indexesByType.get(indexOnTable.getIndexHandlerClass()) == null) {
         List<Index> newType = new ArrayList<Index>();
         newType.add(indexOnTable);
@@ -135,7 +131,7 @@ public class IndexWhereProcessor impleme
       }
     }
 
-    // choose index type with most indexes of the same type on the table
+    // choose index type with most tsToIndices of the same type on the table
     // TODO HIVE-2130 This would be a good place for some sort of cost based choice?
     List<Index> bestIndexes = indexesByType.values().iterator().next();
     for (List<Index> indexTypes : indexesByType.values()) {
@@ -179,7 +175,7 @@ public class IndexWhereProcessor impleme
   }
 
   /**
-   * Get a list of Tasks to activate use of indexes.
+   * Get a list of Tasks to activate use of tsToIndices.
    * Generate the tasks for the index query (where we store results of
    * querying the index in a tmp file) inside the IndexHandler
    * @param predicate Predicate of query to rewrite
@@ -193,7 +189,7 @@ public class IndexWhereProcessor impleme
                                 HiveIndexQueryContext queryContext)
                                 throws SemanticException {
     HiveIndexHandler indexHandler;
-    // All indexes in the list are of the same type, and therefore can use the
+    // All tsToIndices in the list are of the same type, and therefore can use the
     // same handler to generate the index query tasks
     Index index = indexes.get(0);
     try {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java Sun Aug 10 01:15:37 2014
@@ -116,12 +116,11 @@ public class IndexWhereTaskDispatcher im
 
     // query the metastore to know what columns we have indexed
     Collection<Table> topTables = pctx.getTopToTable().values();
-    Map<Table, List<Index>> indexes = new HashMap<Table, List<Index>>();
-    for (Table tbl : topTables)
-    {
-      List<Index> tblIndexes = IndexUtils.getIndexes(tbl, supportedIndexes);
+    Map<TableScanOperator, List<Index>> indexes = new HashMap<TableScanOperator, List<Index>>();
+    for (Map.Entry<TableScanOperator, Table> entry : pctx.getTopToTable().entrySet()) {
+      List<Index> tblIndexes = IndexUtils.getIndexes(entry.getValue(), supportedIndexes);
       if (tblIndexes.size() > 0) {
-        indexes.put(tbl, tblIndexes);
+        indexes.put(entry.getKey(), tblIndexes);
       }
     }
 

Modified: hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out?rev=1617039&r1=1617038&r2=1617039&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out Sun Aug 10 01:15:37 2014
@@ -474,9 +474,11 @@ STAGE PLANS:
               key expressions: _col0 (type: int), _col1 (type: int)
               sort order: ++
               Statistics: Num rows: 60 Data size: 6049 Basic stats: COMPLETE Column stats: NONE
-              value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint)
+              value expressions: _col2 (type: bigint)
       Reduce Operator Tree:
-        Extract
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: int), VALUE._col0 (type: bigint)
+          outputColumnNames: _col0, _col1, _col2
           Statistics: Num rows: 60 Data size: 6049 Basic stats: COMPLETE Column stats: NONE
           File Output Operator
             compressed: false
@@ -621,9 +623,11 @@ STAGE PLANS:
               key expressions: _col0 (type: int), _col1 (type: int)
               sort order: ++
               Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE
-              value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: bigint)
+              value expressions: _col2 (type: bigint)
       Reduce Operator Tree:
-        Extract
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: int), VALUE._col0 (type: bigint)
+          outputColumnNames: _col0, _col1, _col2
           Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE
           File Output Operator
             compressed: false
@@ -765,24 +769,27 @@ STAGE PLANS:
                   keys: year(l_shipdate) (type: int), month(l_shipdate) (type: int)
                   mode: hash
                   outputColumnNames: _col0, _col1, _col2
-                  Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE
-                  Reduce Output Operator
-                    key expressions: _col0 (type: int), _col1 (type: int)
-                    sort order: ++
-                    Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
-                    Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE
-                    value expressions: _col2 (type: bigint)
+                  Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: _col1 is not null (type: boolean)
+                    Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int), _col1 (type: int)
+                      sort order: ++
+                      Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+                      Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col2 (type: bigint)
       Reduce Operator Tree:
         Group By Operator
           aggregations: sum(VALUE._col0)
           keys: KEY._col0 (type: int), KEY._col1 (type: int)
           mode: mergepartial
           outputColumnNames: _col0, _col1, _col2
-          Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
+          Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE
           Select Operator
             expressions: _col1 (type: int), _col2 (type: bigint)
             outputColumnNames: _col1, _col2
-            Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE
             File Output Operator
               compressed: false
               table:
@@ -798,31 +805,31 @@ STAGE PLANS:
               key expressions: _col1 (type: int)
               sort order: +
               Map-reduce partition columns: _col1 (type: int)
-              Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
-              value expressions: _col1 (type: int), _col2 (type: bigint)
+              Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE
+              value expressions: _col2 (type: bigint)
           TableScan
             Reduce Output Operator
               key expressions: _col1 (type: int)
               sort order: +
               Map-reduce partition columns: _col1 (type: int)
-              Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
-              value expressions: _col1 (type: int), _col2 (type: bigint)
+              Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE
+              value expressions: _col2 (type: bigint)
       Reduce Operator Tree:
         Join Operator
           condition map:
                Inner Join 0 to 1
           condition expressions:
-            0 {VALUE._col1} {VALUE._col2}
-            1 {VALUE._col1} {VALUE._col2}
+            0 {KEY.reducesinkkey0} {VALUE._col1}
+            1 {KEY.reducesinkkey0} {VALUE._col1}
           outputColumnNames: _col1, _col2, _col4, _col5
-          Statistics: Num rows: 25 Data size: 2308 Basic stats: COMPLETE Column stats: NONE
+          Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE
           Select Operator
             expressions: _col1 (type: int), _col4 (type: int), ((_col5 - _col2) / _col2) (type: double)
             outputColumnNames: _col0, _col1, _col2
-            Statistics: Num rows: 25 Data size: 2308 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE
             File Output Operator
               compressed: false
-              Statistics: Num rows: 25 Data size: 2308 Basic stats: COMPLETE Column stats: NONE
+              Statistics: Num rows: 6 Data size: 601 Basic stats: COMPLETE Column stats: NONE
               table:
                   input format: org.apache.hadoop.mapred.TextInputFormat
                   output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -846,24 +853,27 @@ STAGE PLANS:
                   keys: year(l_shipdate) (type: int), month(l_shipdate) (type: int)
                   mode: hash
                   outputColumnNames: _col0, _col1, _col2
-                  Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE
-                  Reduce Output Operator
-                    key expressions: _col0 (type: int), _col1 (type: int)
-                    sort order: ++
-                    Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
-                    Statistics: Num rows: 47 Data size: 4291 Basic stats: COMPLETE Column stats: NONE
-                    value expressions: _col2 (type: bigint)
+                  Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: _col1 is not null (type: boolean)
+                    Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int), _col1 (type: int)
+                      sort order: ++
+                      Map-reduce partition columns: _col0 (type: int), _col1 (type: int)
+                      Statistics: Num rows: 12 Data size: 1095 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col2 (type: bigint)
       Reduce Operator Tree:
         Group By Operator
           aggregations: sum(VALUE._col0)
           keys: KEY._col0 (type: int), KEY._col1 (type: int)
           mode: mergepartial
           outputColumnNames: _col0, _col1, _col2
-          Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
+          Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE
           Select Operator
             expressions: _col1 (type: int), _col2 (type: bigint)
             outputColumnNames: _col1, _col2
-            Statistics: Num rows: 23 Data size: 2099 Basic stats: COMPLETE Column stats: NONE
+            Statistics: Num rows: 6 Data size: 547 Basic stats: COMPLETE Column stats: NONE
             File Output Operator
               compressed: false
               table:
@@ -890,16 +900,16 @@ select l_shipdate, l_orderkey as cnt
 from lineitem) dummy
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-1 is a root stage
-  Stage-2 depends on stages: Stage-1
+  Stage-3 is a root stage
+  Stage-2 depends on stages: Stage-3
   Stage-0 depends on stages: Stage-2
 
 STAGE PLANS:
-  Stage: Stage-1
+  Stage: Stage-3
     Map Reduce
       Map Operator Tree:
           TableScan
-            alias: null-subquery1:default__lineitem_lineitem_lshipdate_idx__
+            alias: null-subquery1:dummy-subquery1:default__lineitem_lineitem_lshipdate_idx__
             Statistics: Num rows: 95 Data size: 8675 Basic stats: COMPLETE Column stats: NONE
             Select Operator
               expressions: l_shipdate (type: string), _count_of_l_shipdate (type: bigint)
@@ -939,20 +949,6 @@ STAGE PLANS:
     Map Reduce
       Map Operator Tree:
           TableScan
-            Union
-              Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE
-              Select Operator
-                expressions: _col0 (type: string), _col1 (type: bigint)
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE
-                File Output Operator
-                  compressed: false
-                  Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE
-                  table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-          TableScan
             alias: lineitem
             Statistics: Num rows: 116 Data size: 12099 Basic stats: COMPLETE Column stats: NONE
             Select Operator
@@ -972,6 +968,20 @@ STAGE PLANS:
                         input format: org.apache.hadoop.mapred.TextInputFormat
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+          TableScan
+            Union
+              Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: _col0 (type: string), _col1 (type: bigint)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 163 Data size: 16390 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -1016,17 +1026,17 @@ STAGE PLANS:
     Map Reduce
       Map Operator Tree:
           TableScan
-            alias: default__tbl_tbl_key_idx__
+            alias: tbl
             Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
             Filter Operator
               predicate: (key = 1) (type: boolean)
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Select Operator
-                expressions: key (type: int), _count_of_key (type: bigint)
-                outputColumnNames: key, _count_of_key
+                expressions: 1 (type: int)
+                outputColumnNames: key
                 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Group By Operator
-                  aggregations: sum(_count_of_key)
+                  aggregations: count(key)
                   keys: key (type: int)
                   mode: hash
                   outputColumnNames: _col0, _col1
@@ -1039,7 +1049,7 @@ STAGE PLANS:
                     value expressions: _col1 (type: bigint)
       Reduce Operator Tree:
         Group By Operator
-          aggregations: sum(VALUE._col0)
+          aggregations: count(VALUE._col0)
           keys: KEY._col0 (type: int)
           mode: mergepartial
           outputColumnNames: _col0, _col1
@@ -1345,7 +1355,7 @@ STAGE PLANS:
               predicate: (key = 3) (type: boolean)
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Select Operator
-                expressions: key (type: int)
+                expressions: 3 (type: int)
                 outputColumnNames: key
                 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Group By Operator
@@ -1563,7 +1573,7 @@ STAGE PLANS:
               predicate: (value = 1) (type: boolean)
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Select Operator
-                expressions: key (type: int), value (type: int)
+                expressions: key (type: int), 1 (type: int)
                 outputColumnNames: key, value
                 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Group By Operator
@@ -1831,7 +1841,7 @@ STAGE PLANS:
               predicate: (value = 2) (type: boolean)
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Select Operator
-                expressions: key (type: int), value (type: int)
+                expressions: key (type: int), 2 (type: int)
                 outputColumnNames: key, value
                 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Group By Operator
@@ -1887,7 +1897,7 @@ STAGE PLANS:
               predicate: ((value = 2) and (key = 3)) (type: boolean)
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Select Operator
-                expressions: key (type: int), value (type: int)
+                expressions: 3 (type: int), 2 (type: int)
                 outputColumnNames: key, value
                 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Group By Operator
@@ -2108,7 +2118,7 @@ STAGE PLANS:
               predicate: (value = 2) (type: boolean)
               Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
               Select Operator
-                expressions: key (type: int), value (type: int)
+                expressions: key (type: int), 2 (type: int)
                 outputColumnNames: key, value
                 Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Group By Operator
@@ -2489,9 +2499,11 @@ STAGE PLANS:
               key expressions: _col0 (type: int)
               sort order: +
               Statistics: Num rows: 8 Data size: 32 Basic stats: COMPLETE Column stats: NONE
-              value expressions: _col0 (type: int), _col1 (type: bigint)
+              value expressions: _col1 (type: bigint)
       Reduce Operator Tree:
-        Extract
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: bigint)
+          outputColumnNames: _col0, _col1
           Statistics: Num rows: 8 Data size: 32 Basic stats: COMPLETE Column stats: NONE
           File Output Operator
             compressed: false
@@ -2579,9 +2591,11 @@ STAGE PLANS:
               key expressions: _col0 (type: int)
               sort order: +
               Statistics: Num rows: 3 Data size: 215 Basic stats: COMPLETE Column stats: NONE
-              value expressions: _col0 (type: int), _col1 (type: bigint)
+              value expressions: _col1 (type: bigint)
       Reduce Operator Tree:
-        Extract
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: int), VALUE._col0 (type: bigint)
+          outputColumnNames: _col0, _col1
           Statistics: Num rows: 3 Data size: 215 Basic stats: COMPLETE Column stats: NONE
           File Output Operator
             compressed: false