You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jv...@apache.org on 2011/09/13 04:20:53 UTC

svn commit: r1170007 [2/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ data/files/ ql/src/java/org/apache/hadoop/hive/ql/index/ ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/ ql/src/java/org/apache/hadoop/hive/ql/index/compact/ ...

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteParseContextGenerator.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.QB;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * RewriteParseContextGenerator is a class that offers methods to generate operator tree
+ * for input queries. It is implemented on lines of the analyzeInternal(..) method
+ * of {@link SemanticAnalyzer} but it creates only the ParseContext for the input query command.
+ * It does not optimize or generate map-reduce tasks for the input query.
+ * This can be used when you need to create operator tree for an internal query.
+ *
+ */
+public final class RewriteParseContextGenerator {
+  private static final Log LOG = LogFactory.getLog(RewriteParseContextGenerator.class.getName());
+
+  private RewriteParseContextGenerator(){
+  }
+
+  /**
+   * Parse the input {@link String} command and generate a ASTNode tree.
+   * @param conf
+   * @param command
+   * @return
+   * @throws SemanticException
+   */
+  public static ParseContext generateOperatorTree(HiveConf conf,
+      String command) throws SemanticException{
+    Context ctx;
+    ParseContext subPCtx = null;
+    try {
+      ctx = new Context(conf);
+      ParseDriver pd = new ParseDriver();
+      ASTNode tree = pd.parse(command, ctx);
+      tree = ParseUtils.findRootNonNullToken(tree);
+
+      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
+      assert(sem instanceof SemanticAnalyzer);
+      doSemanticAnalysis((SemanticAnalyzer) sem, tree, ctx);
+
+      subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+      LOG.info("Sub-query Semantic Analysis Completed");
+    } catch (IOException e) {
+      LOG.error("IOException in generating the operator " +
+        "tree for input command - " + command + " " , e);
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new SemanticException(e.getMessage(), e);
+    } catch (ParseException e) {
+      LOG.error("ParseException in generating the operator " +
+        "tree for input command - " + command + " " , e);
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new SemanticException(e.getMessage(), e);
+    } catch (SemanticException e) {
+      LOG.error("SemanticException in generating the operator " +
+        "tree for input command - " + command + " " , e);
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+      throw new SemanticException(e.getMessage(), e);
+    }
+    return subPCtx;
+
+  }
+
+  /**
+   * For the input ASTNode tree, perform a semantic analysis and check metadata
+   * Generate a operator tree and return the {@link ParseContext} instance for the operator tree.
+   *
+   * @param ctx
+   * @param sem
+   * @param ast
+   * @return
+   * @throws SemanticException
+   */
+  private static void doSemanticAnalysis(SemanticAnalyzer sem,
+      ASTNode ast, Context ctx) throws SemanticException {
+    QB qb = new QB(null, null, false);
+    ASTNode child = ast;
+    ParseContext subPCtx = ((SemanticAnalyzer) sem).getParseContext();
+    subPCtx.setContext(ctx);
+    ((SemanticAnalyzer) sem).init(subPCtx);
+
+    LOG.info("Starting Sub-query Semantic Analysis");
+    sem.doPhase1(child, qb, sem.initPhase1Ctx());
+    LOG.info("Completed phase 1 of Sub-query Semantic Analysis");
+
+    sem.getMetaData(qb);
+    LOG.info("Completed getting MetaData in Sub-query Semantic Analysis");
+
+    LOG.info("Sub-query Abstract syntax tree: " + ast.toStringTree());
+    sem.genPlan(qb);
+
+    LOG.info("Sub-query Completed plan generation");
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndex.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+/**
+ * This class defines a procedure factory used to rewrite the operator plan
+ * Each method replaces the necessary base table data structures with
+ * the index table data structures for each operator.
+ */
+public final class RewriteQueryUsingAggregateIndex {
+  private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndex.class.getName());
+  private static RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = null;
+
+  private RewriteQueryUsingAggregateIndex() {
+    //this prevents the class from getting instantiated
+  }
+
+  private static class NewQuerySelectSchemaProc implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      SelectOperator operator = (SelectOperator)nd;
+      rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
+      List<Operator<? extends Serializable>> childOps = operator.getChildOperators();
+      Operator<? extends Serializable> childOp = childOps.iterator().next();
+
+      //we need to set the colList, outputColumnNames, colExprMap,
+      // rowSchema for only that SelectOperator which precedes the GroupByOperator
+      // count(indexed_key_column) needs to be replaced by sum(`_count_Of_indexed_key_column`)
+      if (childOp instanceof GroupByOperator){
+        List<ExprNodeDesc> selColList =
+          operator.getConf().getColList();
+        selColList.add(rewriteQueryCtx.getAggrExprNode());
+
+        List<String> selOutputColNames =
+          operator.getConf().getOutputColumnNames();
+        selOutputColNames.add(rewriteQueryCtx.getAggrExprNode().getColumn());
+
+        RowSchema selRS = operator.getSchema();
+        List<ColumnInfo> selRSSignature =
+          selRS.getSignature();
+        //Need to create a new type for Column[_count_Of_indexed_key_column] node
+        PrimitiveTypeInfo pti = (PrimitiveTypeInfo) TypeInfoFactory.getPrimitiveTypeInfo("bigint");
+        pti.setTypeName("bigint");
+        ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false);
+        selRSSignature.add(newCI);
+        selRS.setSignature((ArrayList<ColumnInfo>) selRSSignature);
+        operator.setSchema(selRS);
+      }
+      return null;
+    }
+  }
+
+    public static NewQuerySelectSchemaProc getNewQuerySelectSchemaProc(){
+      return new NewQuerySelectSchemaProc();
+  }
+
+
+  /**
+   * This processor replaces the original TableScanOperator with
+   * the new TableScanOperator and metadata that scans over the
+   * index table rather than scanning over the orginal table.
+   *
+   */
+  private static class ReplaceTableScanOpProc implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      TableScanOperator scanOperator = (TableScanOperator)nd;
+      rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
+      String baseTableName = rewriteQueryCtx.getBaseTableName();
+      String alias = null;
+      if(baseTableName.contains(":")){
+        alias = (baseTableName.split(":"))[0];
+      }
+
+      //Need to remove the original TableScanOperators from these data structures
+      // and add new ones
+      Map<TableScanOperator, Table>  topToTable =
+        rewriteQueryCtx.getParseContext().getTopToTable();
+      Map<String, Operator<? extends Serializable>>  topOps =
+        rewriteQueryCtx.getParseContext().getTopOps();
+      Map<Operator<? extends Serializable>, OpParseContext>  opParseContext =
+        rewriteQueryCtx.getParseContext().getOpParseCtx();
+
+      //need this to set rowResolver for new scanOperator
+      OpParseContext operatorContext = opParseContext.get(scanOperator);
+
+      //remove original TableScanOperator
+      topToTable.remove(scanOperator);
+      topOps.remove(baseTableName);
+      opParseContext.remove(scanOperator);
+
+      //construct a new descriptor for the index table scan
+      TableScanDesc indexTableScanDesc = new TableScanDesc();
+      indexTableScanDesc.setGatherStats(false);
+
+      String indexTableName = rewriteQueryCtx.getIndexName();
+      Table indexTableHandle = null;
+      try {
+        indexTableHandle = rewriteQueryCtx.getHiveDb().getTable(indexTableName);
+      } catch (HiveException e) {
+        LOG.error("Error while getting the table handle for index table.");
+        LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+        throw new SemanticException(e.getMessage(), e);
+      }
+
+      String k = indexTableName + Path.SEPARATOR;
+      indexTableScanDesc.setStatsAggPrefix(k);
+      scanOperator.setConf(indexTableScanDesc);
+
+      //Construct the new RowResolver for the new TableScanOperator
+      RowResolver rr = new RowResolver();
+      try {
+        StructObjectInspector rowObjectInspector =
+          (StructObjectInspector) indexTableHandle.getDeserializer().getObjectInspector();
+        List<? extends StructField> fields = rowObjectInspector
+        .getAllStructFieldRefs();
+        for (int i = 0; i < fields.size(); i++) {
+          rr.put(indexTableName, fields.get(i).getFieldName(), new ColumnInfo(fields
+              .get(i).getFieldName(), TypeInfoUtils
+              .getTypeInfoFromObjectInspector(fields.get(i)
+                  .getFieldObjectInspector()), indexTableName, false));
+        }
+      } catch (SerDeException e) {
+        LOG.error("Error while creating the RowResolver for new TableScanOperator.");
+        LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+        throw new SemanticException(e.getMessage(), e);
+      }
+
+      //Set row resolver for new table
+      operatorContext.setRowResolver(rr);
+      String tabNameWithAlias = null;
+      if(alias != null){
+        tabNameWithAlias = alias + ":" + indexTableName;
+       }else{
+         tabNameWithAlias = indexTableName;
+       }
+
+      //Scan operator now points to other table
+      topToTable.put(scanOperator, indexTableHandle);
+      scanOperator.getConf().setAlias(tabNameWithAlias);
+      scanOperator.setAlias(indexTableName);
+      topOps.put(tabNameWithAlias, scanOperator);
+      opParseContext.put(scanOperator, operatorContext);
+      rewriteQueryCtx.getParseContext().setTopToTable(
+          (HashMap<TableScanOperator, Table>) topToTable);
+      rewriteQueryCtx.getParseContext().setTopOps(
+          (HashMap<String, Operator<? extends Serializable>>) topOps);
+      rewriteQueryCtx.getParseContext().setOpParseCtx(
+          (LinkedHashMap<Operator<? extends Serializable>, OpParseContext>) opParseContext);
+
+      return null;
+    }
+  }
+
+  public static ReplaceTableScanOpProc getReplaceTableScanProc(){
+    return new ReplaceTableScanOpProc();
+  }
+
+  /**
+   * We need to replace the count(indexed_column_key) GenericUDAF aggregation function for
+   * group-by construct to "sum" GenericUDAF.
+   * This processor creates a new operator tree for a sample query that creates a GroupByOperator
+   * with sum aggregation function and uses that GroupByOperator information to replace
+   * the original GroupByOperator aggregation information.
+   * It replaces the AggregationDesc (aggregation descriptor) of the old GroupByOperator with the
+   * new Aggregation Desc of the new GroupByOperator.
+   */
+  private static class NewQueryGroupbySchemaProc implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+        Object... nodeOutputs) throws SemanticException {
+      GroupByOperator operator = (GroupByOperator)nd;
+      rewriteQueryCtx = (RewriteQueryUsingAggregateIndexCtx)ctx;
+
+      //We need to replace the GroupByOperator which is in
+      //groupOpToInputTables map with the new GroupByOperator
+      if(rewriteQueryCtx.getParseContext().getGroupOpToInputTables().containsKey(operator)){
+        List<ExprNodeDesc> gbyKeyList = operator.getConf().getKeys();
+        String gbyKeys = null;
+        Iterator<ExprNodeDesc> gbyKeyListItr = gbyKeyList.iterator();
+        while(gbyKeyListItr.hasNext()){
+          ExprNodeDesc expr = gbyKeyListItr.next().clone();
+          if(expr instanceof ExprNodeColumnDesc){
+            ExprNodeColumnDesc colExpr = (ExprNodeColumnDesc)expr;
+            gbyKeys = colExpr.getColumn();
+            if(gbyKeyListItr.hasNext()){
+              gbyKeys = gbyKeys + ",";
+            }
+          }
+        }
+
+
+          //the query contains the sum aggregation GenericUDAF
+        String selReplacementCommand = "select sum(`"
+          + rewriteQueryCtx.getAggregateFunction() + "`)"
+          + " from " + rewriteQueryCtx.getIndexName()
+          + " group by " + gbyKeys + " ";
+        //create a new ParseContext for the query to retrieve its operator tree,
+        //and the required GroupByOperator from it
+        ParseContext newDAGContext = RewriteParseContextGenerator.generateOperatorTree(
+            rewriteQueryCtx.getParseContext().getConf(),
+            selReplacementCommand);
+
+        //we get our new GroupByOperator here
+        Map<GroupByOperator, Set<String>> newGbyOpMap = newDAGContext.getGroupOpToInputTables();
+        GroupByOperator newGbyOperator = newGbyOpMap.keySet().iterator().next();
+        GroupByDesc oldConf = operator.getConf();
+
+        //we need this information to set the correct colList, outputColumnNames in SelectOperator
+        ExprNodeColumnDesc aggrExprNode = null;
+
+        //Construct the new AggregationDesc to get rid of the current
+        //internal names and replace them with new internal names
+        //as required by the operator tree
+        GroupByDesc newConf = newGbyOperator.getConf();
+        List<AggregationDesc> newAggrList = newConf.getAggregators();
+        if(newAggrList != null && newAggrList.size() > 0){
+          for (AggregationDesc aggregationDesc : newAggrList) {
+            rewriteQueryCtx.setEval(aggregationDesc.getGenericUDAFEvaluator());
+            aggrExprNode = (ExprNodeColumnDesc)aggregationDesc.getParameters().get(0);
+            rewriteQueryCtx.setAggrExprNode(aggrExprNode);
+          }
+        }
+
+        //Now the GroupByOperator has the new AggregationList; sum(`_count_of_indexed_key`)
+        //instead of count(indexed_key)
+        OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator);
+        RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver();
+        gbyOPC.setRowResolver(gbyRR);
+        rewriteQueryCtx.getOpc().put(operator, gbyOPC);
+
+        oldConf.setAggregators((ArrayList<AggregationDesc>) newAggrList);
+        operator.setConf(oldConf);
+
+
+      }else{
+        //we just need to reset the GenericUDAFEvaluator and its name for this
+        //GroupByOperator whose parent is the ReduceSinkOperator
+        GroupByDesc childConf = (GroupByDesc) operator.getConf();
+        List<AggregationDesc> childAggrList = childConf.getAggregators();
+        if(childAggrList != null && childAggrList.size() > 0){
+          for (AggregationDesc aggregationDesc : childAggrList) {
+            List<ExprNodeDesc> paraList = aggregationDesc.getParameters();
+            List<TypeInfo> parametersTypeInfoList = new ArrayList<TypeInfo>();
+            for (ExprNodeDesc expr : paraList) {
+              parametersTypeInfoList.add(expr.getTypeInfo());
+            }
+            GenericUDAFEvaluator evaluator = FunctionRegistry.getGenericUDAFEvaluator(
+                "sum", parametersTypeInfoList, false, false);
+            aggregationDesc.setGenericUDAFEvaluator(evaluator);
+            aggregationDesc.setGenericUDAFName("sum");
+          }
+        }
+
+      }
+
+      return null;
+    }
+  }
+
+  public static NewQueryGroupbySchemaProc getNewQueryGroupbySchemaProc(){
+    return new NewQueryGroupbySchemaProc();
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Tue Sep 13 02:20:52 2011
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.index;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
+
+/**
+ * RewriteQueryUsingAggregateIndexCtx class stores the
+ * context for the {@link RewriteQueryUsingAggregateIndex}
+ * used to rewrite operator plan with index table instead of base table.
+ */
+
+public final class RewriteQueryUsingAggregateIndexCtx  implements NodeProcessorCtx {
+
+  private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb,
+      String indexTableName, String baseTableName, String aggregateFunction){
+    this.parseContext = parseContext;
+    this.hiveDb = hiveDb;
+    this.indexTableName = indexTableName;
+    this.baseTableName = baseTableName;
+    this.aggregateFunction = aggregateFunction;
+    this.opc = parseContext.getOpParseCtx();
+  }
+
+  public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext,
+      Hive hiveDb, String indexTableName, String baseTableName, String aggregateFunction){
+    return new RewriteQueryUsingAggregateIndexCtx(
+        parseContext, hiveDb, indexTableName, baseTableName, aggregateFunction);
+  }
+
+
+  private Map<Operator<? extends Serializable>, OpParseContext> opc =
+    new LinkedHashMap<Operator<? extends Serializable>, OpParseContext>();
+  private final Hive hiveDb;
+  private final ParseContext parseContext;
+  //We need the GenericUDAFEvaluator for GenericUDAF function "sum"
+  private GenericUDAFEvaluator eval = null;
+  private final String indexTableName;
+  private final String baseTableName;
+  private final String aggregateFunction;
+  private ExprNodeColumnDesc aggrExprNode = null;
+
+  public Map<Operator<? extends Serializable>, OpParseContext> getOpc() {
+    return opc;
+  }
+
+  public  ParseContext getParseContext() {
+    return parseContext;
+  }
+
+  public Hive getHiveDb() {
+    return hiveDb;
+  }
+
+  public String getIndexName() {
+     return indexTableName;
+  }
+
+  public GenericUDAFEvaluator getEval() {
+    return eval;
+  }
+
+  public void setEval(GenericUDAFEvaluator eval) {
+    this.eval = eval;
+  }
+
+  public void setAggrExprNode(ExprNodeColumnDesc aggrExprNode) {
+    this.aggrExprNode = aggrExprNode;
+  }
+
+  public ExprNodeColumnDesc getAggrExprNode() {
+    return aggrExprNode;
+  }
+
+ /**
+  * Walk the original operator tree using the {@link DefaultGraphWalker} using the rules.
+  * Each of the rules invoke respective methods from the {@link RewriteQueryUsingAggregateIndex}
+  * to rewrite the original query using aggregate index.
+  *
+  * @param topOp
+  * @throws SemanticException
+  */
+  public void invokeRewriteQueryProc(
+      Operator<? extends Serializable> topOp) throws SemanticException{
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+    // replace scan operator containing original table with index table
+    opRules.put(new RuleRegExp("R1", "TS%"),
+        RewriteQueryUsingAggregateIndex.getReplaceTableScanProc());
+    //rule that replaces index key selection with
+    //sum(`_count_of_indexed_column`) function in original query
+    opRules.put(new RuleRegExp("R2", "SEL%"),
+        RewriteQueryUsingAggregateIndex.getNewQuerySelectSchemaProc());
+    //Manipulates the ExprNodeDesc from GroupByOperator aggregation list
+    opRules.put(new RuleRegExp("R3", "GBY%"),
+        RewriteQueryUsingAggregateIndex.getNewQueryGroupbySchemaProc());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, this);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.add(topOp);
+    ogw.startWalking(topNodes, null);
+  }
+
+ /**
+  * Default procedure for {@link DefaultRuleDispatcher}.
+  * @return
+  */
+  private NodeProcessor getDefaultProc() {
+    return new NodeProcessor() {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+        return null;
+      }
+    };
+  }
+
+  public String getBaseTableName() {
+    return baseTableName;
+  }
+
+  public String getAggregateFunction() {
+    return aggregateFunction;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java Tue Sep 13 02:20:52 2011
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.optimi
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -31,11 +30,7 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -46,19 +41,17 @@ import org.apache.hadoop.hive.ql.index.H
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
 
 /**
 *
@@ -71,12 +64,10 @@ public class IndexWhereProcessor impleme
 
   private static final Log LOG = LogFactory.getLog(IndexWhereProcessor.class.getName());
   private final Map<Table, List<Index>> indexes;
-  private Map<Index, Table> indexToIndexTable;
 
   public IndexWhereProcessor(Map<Table, List<Index>> indexes) {
     super();
     this.indexes = indexes;
-    this.indexToIndexTable = new HashMap<Index, Table>();
   }
 
   @Override
@@ -107,7 +98,7 @@ public class IndexWhereProcessor impleme
     // check if we have indexes on all partitions in this table scan
     Set<Partition> queryPartitions;
     try {
-      queryPartitions = checkPartitionsCoveredByIndex(operator, pctx);
+      queryPartitions = IndexUtils.checkPartitionsCoveredByIndex(operator, pctx, indexes);
       if (queryPartitions == null) { // partitions not covered
         return null;
       }
@@ -230,157 +221,6 @@ public class IndexWhereProcessor impleme
     return;
   }
 
-  /**
-   * Check the partitions used by the table scan to make sure they also exist in the
-   * index table
-   * @param pctx
-   * @param operator
-   * @return partitions used by query.  null if they do not exist in index table
-   */
-  private Set<Partition> checkPartitionsCoveredByIndex(TableScanOperator tableScan, ParseContext pctx)
-    throws HiveException {
-    Hive hive = Hive.get(pctx.getConf());
-
-
-    // make sure each partition exists on the index table
-    PrunedPartitionList queryPartitionList = pctx.getOpToPartList().get(tableScan);
-    Set<Partition> queryPartitions = queryPartitionList.getConfirmedPartns();
-
-    for (Partition part : queryPartitions) {
-      List<Table> sourceIndexTables = getIndexTables(hive, part);
-      if (!containsPartition(hive, part)) {
-        return null; // problem if it doesn't contain the partition
-      }
-    }
-
-    return queryPartitions;
-  }
-
-  /**
-   * return index tables associated with a given base table
-   */
-  private List<Table> getIndexTables(Hive hive, Table table) throws
-    HiveException {
-    List<Table> indexTables = new ArrayList<Table>();
-    if (indexes == null || indexes.get(table) == null) {
-      return indexTables;
-    }
-    for (Index index : indexes.get(table)) {
-      Table indexTable = hive.getTable(index.getIndexTableName());
-      indexToIndexTable.put(index, indexTable);
-      indexTables.add(indexTable);
-    }
-    return indexTables;
-  }
-
-  /**
-   * return index tables associated with the base table of the partition
-   */
-  private List<Table> getIndexTables(Hive hive, Partition part) throws HiveException {
-    List<Table> indexTables = new ArrayList<Table>();
-    Table partitionedTable = part.getTable();
-    if (indexes == null || indexes.get(partitionedTable) == null) {
-      return indexTables;
-    }
-    for (Index index : indexes.get(partitionedTable)) {
-      Table indexTable = hive.getTable(index.getIndexTableName());
-      indexToIndexTable.put(index, indexTable);
-      indexTables.add(indexTable);
-    }
-    return indexTables;
-  }
-
-  /**
-   * check that every index table contains the given partition and is fresh
-   */
-  private boolean containsPartition(Hive hive, Partition part)
-    throws HiveException {
-    HashMap<String, String> partSpec = part.getSpec();
-
-    if (indexes == null || indexes.get(part.getTable()) == null) {
-      return false;
-    }
-
-    if (partSpec.isEmpty()) {
-      // empty specs come from non-partitioned tables
-      return isIndexTableFresh(hive, indexes.get(part.getTable()), part.getTable());
-    }
-
-    for (Index index : indexes.get(part.getTable())) {
-      Table indexTable = indexToIndexTable.get(index);
-      // get partitions that match the spec
-      List<Partition> matchingPartitions = hive.getPartitions(indexTable, partSpec);
-      if (matchingPartitions == null || matchingPartitions.size() == 0) {
-        LOG.info("Index table " + indexTable + "did not contain built partition that matched " + partSpec);
-        return false;
-      } else if (!isIndexPartitionFresh(hive, index, part)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Check the index partitions on a parttioned table exist and are fresh
-   */
-  private boolean isIndexPartitionFresh(Hive hive, Index index,
-      Partition part) throws HiveException {
-    LOG.info("checking index staleness...");
-    try {
-      FileSystem partFs = part.getPartitionPath().getFileSystem(hive.getConf());
-      FileStatus partFss = partFs.getFileStatus(part.getPartitionPath());
-      String ts = index.getParameters().get(part.getSpec().toString());
-      if (ts == null) {
-        return false;
-      }
-      long indexTs = Long.parseLong(ts);
-      LOG.info(partFss.getModificationTime());
-      LOG.info(ts);
-      if (partFss.getModificationTime() > indexTs) {
-        LOG.info("index is stale on the partitions that matched " + part.getSpec());
-        return false;
-      }
-    } catch (IOException e) {
-      LOG.info("failed to grab timestamp info");
-      throw new HiveException(e);
-    }
-    return true;
-  }
-
-  /**
-   * Check that the indexes on the unpartioned table exist and are fresh
-   */
-  private boolean isIndexTableFresh(Hive hive, List<Index> indexes, Table src)
-    throws HiveException {
-    //check that they exist
-    if (indexes == null || indexes.size() == 0) {
-      return false;
-    }
-    //check that they are not stale
-    for (Index index : indexes) {
-      LOG.info("checking index staleness...");
-      try {
-        FileSystem srcFs = src.getPath().getFileSystem(hive.getConf());
-        FileStatus srcFss= srcFs.getFileStatus(src.getPath());
-        String ts = index.getParameters().get("base_timestamp");
-        if (ts == null) {
-          return false;
-        }
-        long indexTs = Long.parseLong(ts);
-        LOG.info(srcFss.getModificationTime());
-        LOG.info(ts);
-        if (srcFss.getModificationTime() > indexTs) {
-          LOG.info("index is stale ");
-          return false;
-        }
-      } catch (IOException e) {
-        LOG.info("failed to grab timestamp info");
-        throw new HiveException(e);
-      }
-    }
-    return true;
-  }
-
 
   /**
    * Insert the rewrite tasks at the head of the pctx task tree

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java?rev=1170007&r1=1170006&r2=1170007&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java Tue Sep 13 02:20:52 2011
@@ -29,8 +29,8 @@ import java.util.Stack;
 
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler;
 import org.apache.hadoop.hive.ql.index.bitmap.BitmapIndexHandler;
+import org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -40,8 +40,8 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.IndexUtils;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -118,7 +118,7 @@ public class IndexWhereTaskDispatcher im
     Map<Table, List<Index>> indexes = new HashMap<Table, List<Index>>();
     for (Table tbl : topTables)
     {
-      List<Index> tblIndexes = getIndexes(tbl, supportedIndexes);
+      List<Index> tblIndexes = IndexUtils.getIndexes(tbl, supportedIndexes);
       if (tblIndexes.size() > 0) {
         indexes.put(tbl, tblIndexes);
       }
@@ -136,29 +136,6 @@ public class IndexWhereTaskDispatcher im
     return operatorRules;
   }
 
-  /**
-   * Get a list of indexes on a table that match given types.
-   * Copied from HIVE-1694 patch
-   */
-  private List<Index> getIndexes(Table baseTableMetaData, List<String> matchIndexTypes)
-    throws SemanticException {
-    List<Index> matchingIndexes = new ArrayList<Index>();
-    List<Index> indexesOnTable = null;
-
-    try {
-      indexesOnTable = baseTableMetaData.getAllIndexes((short) -1); // get all indexes
-    } catch (HiveException e) {
-      throw new SemanticException("Error accessing metastore", e);
-    }
-
-    for (Index index : indexesOnTable) {
-      String indexType = index.getIndexHandlerClass();
-      if (matchIndexTypes.contains(indexType)) {
-        matchingIndexes.add(index);
-      }
-    }
-    return matchingIndexes;
-  }
 
   private NodeProcessor getDefaultProcessor() {
     return new NodeProcessor() {

Added: hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q?rev=1170007&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx.q Tue Sep 13 02:20:52 2011
@@ -0,0 +1,170 @@
+
+DROP TABLE lineitem;
+CREATE TABLE lineitem (L_ORDERKEY      INT,
+                                L_PARTKEY       INT,
+                                L_SUPPKEY       INT,
+                                L_LINENUMBER    INT,
+                                L_QUANTITY      DOUBLE,
+                                L_EXTENDEDPRICE DOUBLE,
+                                L_DISCOUNT      DOUBLE,
+                                L_TAX           DOUBLE,
+                                L_RETURNFLAG    STRING,
+                                L_LINESTATUS    STRING,
+                                l_shipdate      STRING,
+                                L_COMMITDATE    STRING,
+                                L_RECEIPTDATE   STRING,
+                                L_SHIPINSTRUCT  STRING,
+                                L_SHIPMODE      STRING,
+                                L_COMMENT       STRING)
+ROW FORMAT DELIMITED
+FIELDS TERMINATED BY '|';
+
+LOAD DATA LOCAL INPATH '../data/files/lineitem.txt' OVERWRITE INTO TABLE lineitem;
+
+CREATE INDEX lineitem_lshipdate_idx ON TABLE lineitem(l_shipdate) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(l_shipdate)");
+ALTER INDEX lineitem_lshipdate_idx ON lineitem REBUILD;
+
+explain select l_shipdate, count(l_shipdate)
+from lineitem
+group by l_shipdate;
+
+select l_shipdate, count(l_shipdate)
+from lineitem
+group by l_shipdate
+order by l_shipdate;
+
+set hive.optimize.index.groupby=true;
+
+explain select l_shipdate, count(l_shipdate)
+from lineitem
+group by l_shipdate;
+
+select l_shipdate, count(l_shipdate)
+from lineitem
+group by l_shipdate
+order by l_shipdate;
+
+set hive.optimize.index.groupby=false;
+
+
+explain select year(l_shipdate) as year,
+        month(l_shipdate) as month,
+        count(l_shipdate) as monthly_shipments
+from lineitem
+group by year(l_shipdate), month(l_shipdate) 
+order by year, month;
+
+select year(l_shipdate) as year,
+        month(l_shipdate) as month,
+        count(l_shipdate) as monthly_shipments
+from lineitem
+group by year(l_shipdate), month(l_shipdate) 
+order by year, month;
+
+set hive.optimize.index.groupby=true;
+
+explain select year(l_shipdate) as year,
+        month(l_shipdate) as month,
+        count(l_shipdate) as monthly_shipments
+from lineitem
+group by year(l_shipdate), month(l_shipdate) 
+order by year, month;
+
+select year(l_shipdate) as year,
+        month(l_shipdate) as month,
+        count(l_shipdate) as monthly_shipments
+from lineitem
+group by year(l_shipdate), month(l_shipdate) 
+order by year, month;
+
+explain select lastyear.month,
+        thisyear.month,
+        (thisyear.monthly_shipments - lastyear.monthly_shipments) /
+lastyear.monthly_shipments as monthly_shipments_delta
+   from (select year(l_shipdate) as year,
+                month(l_shipdate) as month,
+                count(l_shipdate) as monthly_shipments
+           from lineitem
+          where year(l_shipdate) = 1997
+          group by year(l_shipdate), month(l_shipdate)
+        )  lastyear join
+        (select year(l_shipdate) as year,
+                month(l_shipdate) as month,
+                count(l_shipdate) as monthly_shipments
+           from lineitem
+          where year(l_shipdate) = 1998
+          group by year(l_shipdate), month(l_shipdate)
+        )  thisyear
+  on lastyear.month = thisyear.month;
+
+explain  select l_shipdate, cnt
+from (select l_shipdate, count(l_shipdate) as cnt from lineitem group by l_shipdate
+union all
+select l_shipdate, l_orderkey as cnt
+from lineitem) dummy;
+
+CREATE TABLE tbl(key int, value int);
+CREATE INDEX tbl_key_idx ON TABLE tbl(key) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(key)");
+ALTER INDEX tbl_key_idx ON tbl REBUILD;
+
+EXPLAIN select key, count(key) from tbl where key = 1 group by key;
+EXPLAIN select key, count(key) from tbl group by key;
+
+EXPLAIN select count(1) from tbl;
+EXPLAIN select count(key) from tbl;
+
+EXPLAIN select key FROM tbl GROUP BY key;
+EXPLAIN select key FROM tbl GROUP BY value, key;
+EXPLAIN select key FROM tbl WHERE key = 3 GROUP BY key;
+EXPLAIN select key FROM tbl WHERE value = 2 GROUP BY key;
+EXPLAIN select key FROM tbl GROUP BY key, substr(key,2,3);
+
+EXPLAIN select key, value FROM tbl GROUP BY value, key;
+EXPLAIN select key, value FROM tbl WHERE value = 1 GROUP BY key, value;
+
+EXPLAIN select DISTINCT key FROM tbl;
+EXPLAIN select DISTINCT key FROM tbl;
+EXPLAIN select DISTINCT key FROM tbl;
+EXPLAIN select DISTINCT key, value FROM tbl;
+EXPLAIN select DISTINCT key, value FROM tbl WHERE value = 2;
+EXPLAIN select DISTINCT key, value FROM tbl WHERE value = 2 AND key = 3;
+EXPLAIN select DISTINCT key, value FROM tbl WHERE value = key;
+EXPLAIN select DISTINCT key, substr(value,2,3) FROM tbl WHERE value = key;
+EXPLAIN select DISTINCT key, substr(value,2,3) FROM tbl;
+
+EXPLAIN select * FROM (select DISTINCT key, value FROM tbl) v1 WHERE v1.value = 2;
+
+DROP TABLE tbl;
+
+CREATE TABLE tblpart (key int, value string) PARTITIONED BY (ds string, hr int);
+INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-08', hr=11) SELECT key, value FROM srcpart WHERE ds = '2008-04-08' AND hr = 11;
+INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-08', hr=12) SELECT key, value FROM srcpart WHERE ds = '2008-04-08' AND hr = 12;
+INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-09', hr=11) SELECT key, value FROM srcpart WHERE ds = '2008-04-09' AND hr = 11;
+INSERT OVERWRITE TABLE tblpart PARTITION (ds='2008-04-09', hr=12) SELECT key, value FROM srcpart WHERE ds = '2008-04-09' AND hr = 12;
+
+CREATE INDEX tbl_part_index ON TABLE tblpart(key) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(key)");
+
+ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-08', hr=11) REBUILD;
+EXPLAIN SELECT key, count(key) FROM tblpart WHERE ds='2008-04-09' AND hr=12 AND key < 10 GROUP BY key;
+
+ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-08', hr=12) REBUILD;
+ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-09', hr=11) REBUILD;
+ALTER INDEX tbl_part_index ON tblpart PARTITION (ds='2008-04-09', hr=12) REBUILD;
+EXPLAIN SELECT key, count(key) FROM tblpart WHERE ds='2008-04-09' AND hr=12 AND key < 10 GROUP BY key;
+
+DROP INDEX tbl_part_index on tblpart;
+DROP TABLE tblpart;
+
+CREATE TABLE tbl(key int, value int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; 
+LOAD DATA LOCAL INPATH '../data/files/tbl.txt' OVERWRITE INTO TABLE tbl;
+
+CREATE INDEX tbl_key_idx ON TABLE tbl(key) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler' WITH DEFERRED REBUILD IDXPROPERTIES("AGGREGATES"="count(key)");
+ALTER INDEX tbl_key_idx ON tbl REBUILD;
+
+set hive.optimize.index.groupby=false;
+explain select key, count(key) from tbl group by key order by key;
+select key, count(key) from tbl group by key order by key;
+set hive.optimize.index.groupby=true;
+explain select key, count(key) from tbl group by key order by key;
+select key, count(key) from tbl group by key order by key;
+DROP TABLE tbl;
\ No newline at end of file