You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jp...@apache.org on 2014/12/08 06:08:26 UTC
svn commit: r1643736 - in /hive/trunk: itests/src/test/resources/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
Author: jpullokk
Date: Mon Dec 8 05:08:25 2014
New Revision: 1643736
URL: http://svn.apache.org/r1643736
Log:
Rolled back to 1643551
Removed:
hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx_cbo_1.q
hive/trunk/ql/src/test/queries/clientpositive/ql_rewrite_gbtoidx_cbo_2.q
hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx_cbo_1.q.out
hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx_cbo_2.q.out
Modified:
hive/trunk/itests/src/test/resources/testconfiguration.properties
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out
Modified: hive/trunk/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/trunk/itests/src/test/resources/testconfiguration.properties?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/trunk/itests/src/test/resources/testconfiguration.properties Mon Dec 8 05:08:25 2014
@@ -31,8 +31,6 @@ minimr.query.files=auto_sortmerge_join_1
optrstat_groupby.q,\
parallel_orderby.q,\
ql_rewrite_gbtoidx.q,\
- ql_rewrite_gbtoidx_cbo_1.q,\
- ql_rewrite_gbtoidx_cbo_2.q,\
quotedid_smb.q,\
reduce_deduplicate.q,\
remote_script.q,\
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyCtx.java Mon Dec 8 05:08:25 2014
@@ -21,16 +21,17 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -62,31 +63,47 @@ public final class RewriteCanApplyCtx im
}
// Rewrite Variables
- private boolean selClauseColsFetchException = false;
+ private int aggFuncCnt = 0;
private boolean queryHasGroupBy = false;
private boolean aggFuncIsNotCount = false;
- private boolean aggParameterException = false;
-
- //The most important, indexKey
- private String indexKey;
+ private boolean aggFuncColsFetchException = false;
+ private boolean whrClauseColsFetchException = false;
+ private boolean selClauseColsFetchException = false;
+ private boolean gbyKeysFetchException = false;
+ private boolean countOnAllCols = false;
+ private boolean countOfOne = false;
+ private boolean queryHasMultipleTables = false;
+
+ //Data structures that are populated in the RewriteCanApplyProcFactory
+ //methods to check if the index key meets all criteria
+ private Set<String> selectColumnsList = new LinkedHashSet<String>();
+ private Set<String> predicateColumnsList = new LinkedHashSet<String>();
+ private Set<String> gbKeyNameList = new LinkedHashSet<String>();
+ private Set<String> aggFuncColList = new LinkedHashSet<String>();
private final ParseContext parseContext;
private String alias;
private String baseTableName;
private String indexTableName;
private String aggFunction;
-
- private TableScanOperator tableScanOperator;
- private List<SelectOperator> selectOperators;
- private List<GroupByOperator> groupByOperators;
void resetCanApplyCtx(){
+ setAggFuncCnt(0);
setQueryHasGroupBy(false);
setAggFuncIsNotCount(false);
+ setAggFuncColsFetchException(false);
+ setWhrClauseColsFetchException(false);
setSelClauseColsFetchException(false);
+ setGbyKeysFetchException(false);
+ setCountOnAllCols(false);
+ setCountOfOne(false);
+ setQueryHasMultipleTables(false);
+ selectColumnsList.clear();
+ predicateColumnsList.clear();
+ gbKeyNameList.clear();
+ aggFuncColList.clear();
setBaseTableName("");
setAggFunction("");
- setIndexKey("");
}
public boolean isQueryHasGroupBy() {
@@ -117,6 +134,22 @@ public final class RewriteCanApplyCtx im
return aggFunction;
}
+ public void setAggFuncColsFetchException(boolean aggFuncColsFetchException) {
+ this.aggFuncColsFetchException = aggFuncColsFetchException;
+ }
+
+ public boolean isAggFuncColsFetchException() {
+ return aggFuncColsFetchException;
+ }
+
+ public void setWhrClauseColsFetchException(boolean whrClauseColsFetchException) {
+ this.whrClauseColsFetchException = whrClauseColsFetchException;
+ }
+
+ public boolean isWhrClauseColsFetchException() {
+ return whrClauseColsFetchException;
+ }
+
public void setSelClauseColsFetchException(boolean selClauseColsFetchException) {
this.selClauseColsFetchException = selClauseColsFetchException;
}
@@ -125,6 +158,78 @@ public final class RewriteCanApplyCtx im
return selClauseColsFetchException;
}
+ public void setGbyKeysFetchException(boolean gbyKeysFetchException) {
+ this.gbyKeysFetchException = gbyKeysFetchException;
+ }
+
+ public boolean isGbyKeysFetchException() {
+ return gbyKeysFetchException;
+ }
+
+ public void setCountOnAllCols(boolean countOnAllCols) {
+ this.countOnAllCols = countOnAllCols;
+ }
+
+ public boolean isCountOnAllCols() {
+ return countOnAllCols;
+ }
+
+ public void setCountOfOne(boolean countOfOne) {
+ this.countOfOne = countOfOne;
+ }
+
+ public boolean isCountOfOne() {
+ return countOfOne;
+ }
+
+ public void setQueryHasMultipleTables(boolean queryHasMultipleTables) {
+ this.queryHasMultipleTables = queryHasMultipleTables;
+ }
+
+ public boolean isQueryHasMultipleTables() {
+ return queryHasMultipleTables;
+ }
+
+ public Set<String> getSelectColumnsList() {
+ return selectColumnsList;
+ }
+
+ public void setSelectColumnsList(Set<String> selectColumnsList) {
+ this.selectColumnsList = selectColumnsList;
+ }
+
+ public Set<String> getPredicateColumnsList() {
+ return predicateColumnsList;
+ }
+
+ public void setPredicateColumnsList(Set<String> predicateColumnsList) {
+ this.predicateColumnsList = predicateColumnsList;
+ }
+
+ public Set<String> getGbKeyNameList() {
+ return gbKeyNameList;
+ }
+
+ public void setGbKeyNameList(Set<String> gbKeyNameList) {
+ this.gbKeyNameList = gbKeyNameList;
+ }
+
+ public Set<String> getAggFuncColList() {
+ return aggFuncColList;
+ }
+
+ public void setAggFuncColList(Set<String> aggFuncColList) {
+ this.aggFuncColList = aggFuncColList;
+ }
+
+ public int getAggFuncCnt() {
+ return aggFuncCnt;
+ }
+
+ public void setAggFuncCnt(int aggFuncCnt) {
+ this.aggFuncCnt = aggFuncCnt;
+ }
+
public String getAlias() {
return alias;
}
@@ -153,6 +258,15 @@ public final class RewriteCanApplyCtx im
return parseContext;
}
+ public Set<String> getAllColumns() {
+ Set<String> allColumns = new LinkedHashSet<String>(selectColumnsList);
+ allColumns.addAll(predicateColumnsList);
+ allColumns.addAll(gbKeyNameList);
+ allColumns.addAll(aggFuncColList);
+ return allColumns;
+ }
+
+
/**
* This method walks all the nodes starting from topOp TableScanOperator node
* and invokes methods from {@link RewriteCanApplyProcFactory} for each of the rules
@@ -168,14 +282,10 @@ public final class RewriteCanApplyCtx im
void populateRewriteVars(TableScanOperator topOp)
throws SemanticException{
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- //^TS%[(SEL%)|(FIL%)]*GRY%[(FIL%)]*RS%[(FIL%)]*GRY%
- opRules.put(
- new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%[("
- + SelectOperator.getOperatorName() + "%)|(" + FilterOperator.getOperatorName() + "%)]*"
- + GroupByOperator.getOperatorName() + "%[" + FilterOperator.getOperatorName() + "%]*"
- + ReduceSinkOperator.getOperatorName() + "%[" + FilterOperator.getOperatorName()
- + "%]*" + GroupByOperator.getOperatorName() + "%"),
- RewriteCanApplyProcFactory.canApplyOnTableScanOperator(topOp));
+ opRules.put(new RuleRegExp("R1", FilterOperator.getOperatorName() + "%"),
+ RewriteCanApplyProcFactory.canApplyOnFilterOperator(topOp));
+ opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + "%"),
+ RewriteCanApplyProcFactory.canApplyOnGroupByOperator(topOp));
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -213,53 +323,67 @@ public final class RewriteCanApplyCtx im
//Map for base table to index table mapping
//TableScan operator for base table will be modified to read from index table
- private final Map<String, String> baseToIdxTableMap = new HashMap<String, String>();;
-
- public void addTable(String baseTableName, String indexTableName) {
- baseToIdxTableMap.put(baseTableName, indexTableName);
- }
-
- public String findBaseTable(String baseTableName) {
- return baseToIdxTableMap.get(baseTableName);
- }
+ private final Map<String, String> baseToIdxTableMap =
+ new HashMap<String, String>();;
- public String getIndexKey() {
- return indexKey;
- }
-
- public void setIndexKey(String indexKey) {
- this.indexKey = indexKey;
- }
-
- public TableScanOperator getTableScanOperator() {
- return tableScanOperator;
- }
- public void setTableScanOperator(TableScanOperator tableScanOperator) {
- this.tableScanOperator = tableScanOperator;
- }
-
- public List<SelectOperator> getSelectOperators() {
- return selectOperators;
- }
+ public void addTable(String baseTableName, String indexTableName) {
+ baseToIdxTableMap.put(baseTableName, indexTableName);
+ }
- public void setSelectOperators(List<SelectOperator> selectOperators) {
- this.selectOperators = selectOperators;
- }
+ public String findBaseTable(String baseTableName) {
+ return baseToIdxTableMap.get(baseTableName);
+ }
+
+
+ boolean isIndexUsableForQueryBranchRewrite(Index index, Set<String> indexKeyNames){
+
+ //--------------------------------------------
+ //Check if all columns in select list are part of index key columns
+ if (!indexKeyNames.containsAll(selectColumnsList)) {
+ LOG.info("Select list has non index key column : " +
+ " Cannot use index " + index.getIndexName());
+ return false;
+ }
- public List<GroupByOperator> getGroupByOperators() {
- return groupByOperators;
- }
+ //--------------------------------------------
+ // Check if all columns in where predicate are part of index key columns
+ if (!indexKeyNames.containsAll(predicateColumnsList)) {
+ LOG.info("Predicate column ref list has non index key column : " +
+ " Cannot use index " + index.getIndexName());
+ return false;
+ }
- public void setGroupByOperators(List<GroupByOperator> groupByOperators) {
- this.groupByOperators = groupByOperators;
- }
+ //--------------------------------------------
+ // For group by, we need to check if all keys are from index columns
+ // itself. Here GB key order can be different than index columns but that does
+ // not really matter for final result.
+ if (!indexKeyNames.containsAll(gbKeyNameList)) {
+ LOG.info("Group by key has some non-indexed columns, " +
+ " Cannot use index " + index.getIndexName());
+ return false;
+ }
- public void setAggParameterException(boolean aggParameterException) {
- this.aggParameterException = aggParameterException;
- }
+ // If we have agg function (currently only COUNT is supported), check if its inputs are
+ // from index. we currently support only that.
+ if (aggFuncColList.size() > 0) {
+ if (!indexKeyNames.containsAll(aggFuncColList)){
+ LOG.info("Agg Func input is not present in index key columns. Currently " +
+ "only agg func on index columns are supported by rewrite optimization");
+ return false;
+ }
+ }
- public boolean isAggParameterException() {
- return aggParameterException;
+ //Now that we are good to do this optimization, set parameters in context
+ //which would be used by transformation procedure as inputs.
+ if(queryHasGroupBy
+ && aggFuncCnt == 1
+ && !aggFuncIsNotCount){
+ addTable(baseTableName, index.getIndexTableName());
+ }else{
+ LOG.info("No valid criteria met to apply rewrite.");
+ return false;
+ }
+ return true;
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteCanApplyProcFactory.java Mon Dec 8 05:08:25 2014
@@ -18,9 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer.index;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -28,12 +27,13 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
@@ -43,74 +43,154 @@ import java.util.Stack;
*
*/
public final class RewriteCanApplyProcFactory {
- public static CheckTableScanProc canApplyOnTableScanOperator(TableScanOperator topOp) {
- return new CheckTableScanProc();
- }
- private static class CheckTableScanProc implements NodeProcessor {
- public CheckTableScanProc() {
+ /**
+ * Check for conditions in FilterOperator that do not meet rewrite criteria.
+ */
+ private static class CheckFilterProc implements NodeProcessor {
+
+ private TableScanOperator topOp;
+
+ public CheckFilterProc(TableScanOperator topOp) {
+ this.topOp = topOp;
}
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs)
- throws SemanticException {
- RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx) ctx;
- for (Node node : stack) {
- // For table scan operator,
- // check ReferencedColumns to make sure that only the index column is
- // selected for the following operators.
- if (node instanceof TableScanOperator) {
- TableScanOperator ts = (TableScanOperator) node;
- canApplyCtx.setTableScanOperator(ts);
- List<String> selectColumns = ts.getConf().getReferencedColumns();
- if (selectColumns == null || selectColumns.size() != 1) {
- canApplyCtx.setSelClauseColsFetchException(true);
- return null;
- } else {
- canApplyCtx.setIndexKey(selectColumns.get(0));
- }
- } else if (node instanceof SelectOperator) {
- // For select operators in the stack, we just add them
- if (canApplyCtx.getSelectOperators() == null) {
- canApplyCtx.setSelectOperators(new ArrayList<SelectOperator>());
- }
- canApplyCtx.getSelectOperators().add((SelectOperator) node);
- } else if (node instanceof GroupByOperator) {
- if (canApplyCtx.getGroupByOperators() == null) {
- canApplyCtx.setGroupByOperators(new ArrayList<GroupByOperator>());
- }
- // According to the pre-order,
- // the first GroupbyOperator is the one before RS
- // and the second one is the one after RS
- GroupByOperator operator = (GroupByOperator) node;
- canApplyCtx.getGroupByOperators().add(operator);
- if (!canApplyCtx.isQueryHasGroupBy()) {
- canApplyCtx.setQueryHasGroupBy(true);
- GroupByDesc conf = operator.getConf();
- List<AggregationDesc> aggrList = conf.getAggregators();
- if (aggrList == null || aggrList.size() != 1
- || !("count".equals(aggrList.get(0).getGenericUDAFName()))) {
- // In the current implementation, we make sure that only count is
- // in the function
- canApplyCtx.setAggFuncIsNotCount(true);
- return null;
- } else {
- List<ExprNodeDesc> para = aggrList.get(0).getParameters();
- if (para == null || para.size() == 0 || para.size() > 1) {
- canApplyCtx.setAggParameterException(true);
- return null;
- } else {
- ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator,
- (Operator<OperatorDesc>) stack.get(0));
- if (!(expr instanceof ExprNodeColumnDesc)) {
- canApplyCtx.setAggParameterException(true);
- return null;
- }
- }
- }
- }
- }
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ FilterOperator operator = (FilterOperator)nd;
+ RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx;
+ FilterDesc conf = operator.getConf();
+ //The filter operator should have a predicate of ExprNodeGenericFuncDesc type.
+ //This represents the comparison operator
+ ExprNodeDesc oldengfd = conf.getPredicate();
+ if(oldengfd == null){
+ canApplyCtx.setWhrClauseColsFetchException(true);
+ return null;
+ }
+ ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(oldengfd, operator, topOp);
+ if (backtrack == null) {
+ canApplyCtx.setWhrClauseColsFetchException(true);
+ return null;
+ }
+ //Add the predicate columns to RewriteCanApplyCtx's predColRefs list to check later
+ //if index keys contain all filter predicate columns and vice-a-versa
+ for (String col : backtrack.getCols()) {
+ canApplyCtx.getPredicateColumnsList().add(col);
}
return null;
}
}
+
+ public static CheckFilterProc canApplyOnFilterOperator(TableScanOperator topOp) {
+ return new CheckFilterProc(topOp);
+ }
+
+ /**
+ * Check for conditions in GroupByOperator that do not meet rewrite criteria.
+ *
+ */
+ private static class CheckGroupByProc implements NodeProcessor {
+
+ private TableScanOperator topOp;
+
+ public CheckGroupByProc(TableScanOperator topOp) {
+ this.topOp = topOp;
+ }
+
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ GroupByOperator operator = (GroupByOperator)nd;
+ RewriteCanApplyCtx canApplyCtx = (RewriteCanApplyCtx)ctx;
+ //for each group-by clause in query, only one GroupByOperator of the
+ //GBY-RS-GBY sequence is stored in getGroupOpToInputTables
+ //we need to process only this operator
+ //Also, we do not rewrite for cases when same query branch has multiple group-by constructs
+ if(canApplyCtx.getParseContext().getGroupOpToInputTables().containsKey(operator) &&
+ !canApplyCtx.isQueryHasGroupBy()){
+
+ canApplyCtx.setQueryHasGroupBy(true);
+ GroupByDesc conf = operator.getConf();
+ List<AggregationDesc> aggrList = conf.getAggregators();
+ if(aggrList != null && aggrList.size() > 0){
+ for (AggregationDesc aggregationDesc : aggrList) {
+ canApplyCtx.setAggFuncCnt(canApplyCtx.getAggFuncCnt() + 1);
+ //In the current implementation, we do not support more than 1 agg funcs in group-by
+ if(canApplyCtx.getAggFuncCnt() > 1) {
+ return false;
+ }
+ String aggFunc = aggregationDesc.getGenericUDAFName();
+ if(!("count".equals(aggFunc))){
+ canApplyCtx.setAggFuncIsNotCount(true);
+ return false;
+ }
+ List<ExprNodeDesc> para = aggregationDesc.getParameters();
+ //for a valid aggregation, it needs to have non-null parameter list
+ if (para == null) {
+ canApplyCtx.setAggFuncColsFetchException(true);
+ } else if (para.size() == 0) {
+ //count(*) case
+ canApplyCtx.setCountOnAllCols(true);
+ canApplyCtx.setAggFunction("_count_of_all");
+ } else if (para.size() == 1) {
+ ExprNodeDesc expr = ExprNodeDescUtils.backtrack(para.get(0), operator, topOp);
+ if (expr instanceof ExprNodeColumnDesc){
+ //Add the columns to RewriteCanApplyCtx's selectColumnsList list
+ //to check later if index keys contain all select clause columns
+ //and vice-a-versa. We get the select column 'actual' names only here
+ //if we have a agg func along with group-by
+ //SelectOperator has internal names in its colList data structure
+ canApplyCtx.getSelectColumnsList().add(
+ ((ExprNodeColumnDesc) expr).getColumn());
+ //Add the columns to RewriteCanApplyCtx's aggFuncColList list to check later
+ //if columns contained in agg func are index key columns
+ canApplyCtx.getAggFuncColList().add(
+ ((ExprNodeColumnDesc) expr).getColumn());
+ canApplyCtx.setAggFunction("_count_of_" +
+ ((ExprNodeColumnDesc) expr).getColumn() + "");
+ } else if(expr instanceof ExprNodeConstantDesc) {
+ //count(1) case
+ canApplyCtx.setCountOfOne(true);
+ canApplyCtx.setAggFunction("_count_of_1");
+ }
+ } else {
+ throw new SemanticException("Invalid number of arguments for count");
+ }
+ }
+ }
+
+ //we need to have non-null group-by keys for a valid group-by operator
+ List<ExprNodeDesc> keyList = conf.getKeys();
+ if(keyList == null || keyList.size() == 0){
+ canApplyCtx.setGbyKeysFetchException(true);
+ }
+ for (ExprNodeDesc expr : keyList) {
+ checkExpression(canApplyCtx, expr);
+ }
+ }
+ return null;
+ }
+
+ private void checkExpression(RewriteCanApplyCtx canApplyCtx, ExprNodeDesc expr){
+ if(expr instanceof ExprNodeColumnDesc){
+ //Add the group-by keys to RewriteCanApplyCtx's gbKeyNameList list to check later
+ //if all keys are from index columns
+ canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
+ }else if(expr instanceof ExprNodeGenericFuncDesc){
+ ExprNodeGenericFuncDesc funcExpr = (ExprNodeGenericFuncDesc)expr;
+ List<ExprNodeDesc> childExprs = funcExpr.getChildren();
+ for (ExprNodeDesc childExpr : childExprs) {
+ if(childExpr instanceof ExprNodeColumnDesc){
+ canApplyCtx.getGbKeyNameList().addAll(expr.getCols());
+ canApplyCtx.getSelectColumnsList().add(((ExprNodeColumnDesc) childExpr).getColumn());
+ }else if(childExpr instanceof ExprNodeGenericFuncDesc){
+ checkExpression(canApplyCtx, childExpr);
+ }
+ }
+ }
+ }
+ }
+
+ public static CheckGroupByProc canApplyOnGroupByOperator(TableScanOperator topOp) {
+ return new CheckGroupByProc(topOp);
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteGBUsingIndex.java Mon Dec 8 05:08:25 2014
@@ -21,7 +21,10 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hive.ql.parse.O
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.util.StringUtils;
/**
@@ -149,6 +153,10 @@ public class RewriteGBUsingIndex impleme
* @throws SemanticException
*/
boolean shouldApplyOptimization() throws SemanticException {
+ if (ifQueryHasMultipleTables()) {
+ //We do not apply this optimization for this case as of now.
+ return false;
+ }
Map<Table, List<Index>> tableToIndex = getIndexesForRewrite();
if (tableToIndex.isEmpty()) {
LOG.debug("No Valid Index Found to apply Rewrite, " +
@@ -162,14 +170,19 @@ public class RewriteGBUsingIndex impleme
* the tsOpToProcess to apply rewrite later on.
* */
Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+ Map<String, Operator<?>> topOps = parseContext.getTopOps();
+
for (Map.Entry<String, Operator<?>> entry : parseContext.getTopOps().entrySet()) {
+
String alias = entry.getKey();
TableScanOperator topOp = (TableScanOperator) entry.getValue();
+
Table table = topToTable.get(topOp);
List<Index> indexes = tableToIndex.get(table);
if (indexes.isEmpty()) {
continue;
}
+
if (table.isPartitioned()) {
//if base table has partitions, we need to check if index is built for
//all partitions. If not, then we do not apply the optimization
@@ -183,6 +196,7 @@ public class RewriteGBUsingIndex impleme
//if there are no partitions on base table
checkIfRewriteCanBeApplied(alias, topOp, table, indexes);
}
+
return !tsOpToProcess.isEmpty();
}
@@ -199,21 +213,26 @@ public class RewriteGBUsingIndex impleme
Table baseTable, List<Index> indexes) throws SemanticException{
//Context for checking if this optimization can be applied to the input query
RewriteCanApplyCtx canApplyCtx = RewriteCanApplyCtx.getInstance(parseContext);
+
canApplyCtx.setAlias(alias);
canApplyCtx.setBaseTableName(baseTable.getTableName());
canApplyCtx.populateRewriteVars(topOp);
- Map<Index, String> indexTableMap = getIndexToKeysMap(indexes);
- for (Map.Entry<Index, String> entry : indexTableMap.entrySet()) {
+
+ Map<Index, Set<String>> indexTableMap = getIndexToKeysMap(indexes);
+ for (Map.Entry<Index, Set<String>> entry : indexTableMap.entrySet()) {
//we rewrite the original query using the first valid index encountered
//this can be changed if we have a better mechanism to
//decide which index will produce a better rewrite
Index index = entry.getKey();
- String indexKeyName = entry.getValue();
+ Set<String> indexKeyNames = entry.getValue();
//break here if any valid index is found to apply rewrite
- if (canApplyCtx.getIndexKey() != null && canApplyCtx.getIndexKey().equals(indexKeyName)
- && checkIfAllRewriteCriteriaIsMet(canApplyCtx)) {
- canApplyCtx.setAggFunction("_count_of_" + indexKeyName + "");
- canApplyCtx.addTable(canApplyCtx.getBaseTableName(), index.getIndexTableName());
+ if (canApplyCtx.isIndexUsableForQueryBranchRewrite(index, indexKeyNames) &&
+ checkIfAllRewriteCriteriaIsMet(canApplyCtx)) {
+ //check if aggregation function is set.
+ //If not, set it using the only indexed column
+ if (canApplyCtx.getAggFunction() == null) {
+ canApplyCtx.setAggFunction("_count_of_" + StringUtils.join(",", indexKeyNames) + "");
+ }
canApplyCtx.setIndexTableName(index.getIndexTableName());
tsOpToProcess.put(alias, canApplyCtx);
return true;
@@ -223,6 +242,27 @@ public class RewriteGBUsingIndex impleme
}
/**
+ * This block of code iterates over the topToTable map from ParseContext
+ * to determine if the query has a scan over multiple tables.
+ * @return
+ */
+ boolean ifQueryHasMultipleTables(){
+ Map<TableScanOperator, Table> topToTable = parseContext.getTopToTable();
+ Iterator<Table> valuesItr = topToTable.values().iterator();
+ Set<String> tableNameSet = new HashSet<String>();
+ while(valuesItr.hasNext()){
+ Table table = valuesItr.next();
+ tableNameSet.add(table.getTableName());
+ }
+ if(tableNameSet.size() > 1){
+ LOG.debug("Query has more than one table " +
+ "that is not supported with " + getName() + " optimization.");
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Get a list of indexes which can be used for rewrite.
* @return
* @throws SemanticException
@@ -279,16 +319,19 @@ public class RewriteGBUsingIndex impleme
* @return
* @throws SemanticException
*/
- Map<Index, String> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{
+ Map<Index, Set<String>> getIndexToKeysMap(List<Index> indexTables) throws SemanticException{
Hive hiveInstance = hiveDb;
- Map<Index, String> indexToKeysMap = new LinkedHashMap<Index, String>();
+ Map<Index, Set<String>> indexToKeysMap = new LinkedHashMap<Index, Set<String>>();
for (int idxCtr = 0; idxCtr < indexTables.size(); idxCtr++) {
+ final Set<String> indexKeyNames = new LinkedHashSet<String>();
Index index = indexTables.get(idxCtr);
//Getting index key columns
StorageDescriptor sd = index.getSd();
List<FieldSchema> idxColList = sd.getCols();
- assert idxColList.size()==1;
- String indexKeyName = idxColList.get(0).getName();
+ for (FieldSchema fieldSchema : idxColList) {
+ indexKeyNames.add(fieldSchema.getName());
+ }
+ assert indexKeyNames.size()==1;
// Check that the index schema is as expected. This code block should
// catch problems of this rewrite breaking when the AggregateIndexHandler
// index is changed.
@@ -312,7 +355,7 @@ public class RewriteGBUsingIndex impleme
// and defer the decision of using a particular index for later
// this is to allow choosing a index if a better mechanism is
// designed later to chose a better rewrite
- indexToKeysMap.put(index, indexKeyName);
+ indexToKeysMap.put(index, indexKeyNames);
}
return indexToKeysMap;
}
@@ -323,11 +366,20 @@ public class RewriteGBUsingIndex impleme
* @throws SemanticException
*
*/
+ @SuppressWarnings("unchecked")
private void rewriteOriginalQuery() throws SemanticException {
- for (RewriteCanApplyCtx canApplyCtx : tsOpToProcess.values()) {
+ Map<String, Operator<?>> topOpMap = parseContext.getTopOps();
+ Iterator<String> tsOpItr = tsOpToProcess.keySet().iterator();
+
+ for (Map.Entry<String, RewriteCanApplyCtx> entry : tsOpToProcess.entrySet()) {
+ String alias = entry.getKey();
+ RewriteCanApplyCtx canApplyCtx = entry.getValue();
+ TableScanOperator topOp = (TableScanOperator) topOpMap.get(alias);
RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx =
- RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb, canApplyCtx);
- rewriteQueryCtx.invokeRewriteQueryProc();
+ RewriteQueryUsingAggregateIndexCtx.getInstance(parseContext, hiveDb,
+ canApplyCtx.getIndexTableName(), canApplyCtx.getAlias(),
+ canApplyCtx.getAllColumns(), canApplyCtx.getAggFunction());
+ rewriteQueryCtx.invokeRewriteQueryProc(topOp);
parseContext = rewriteQueryCtx.getParseContext();
parseContext.setOpParseCtx((LinkedHashMap<Operator<? extends OperatorDesc>,
OpParseContext>) rewriteQueryCtx.getOpc());
@@ -340,20 +392,45 @@ public class RewriteGBUsingIndex impleme
* This method logs the reason for which we cannot apply the rewrite optimization.
* @return
*/
- boolean checkIfAllRewriteCriteriaIsMet(RewriteCanApplyCtx canApplyCtx) {
- if (canApplyCtx.isSelClauseColsFetchException()) {
- LOG.debug("Got exception while locating child col refs for select list, " + "skipping "
- + getName() + " optimization.");
+ boolean checkIfAllRewriteCriteriaIsMet(RewriteCanApplyCtx canApplyCtx){
+ if (canApplyCtx.getAggFuncCnt() > 1){
+ LOG.debug("More than 1 agg funcs: " +
+ "Not supported by " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isAggFuncIsNotCount()){
+ LOG.debug("Agg func other than count is " +
+ "not supported by " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isCountOnAllCols()){
+ LOG.debug("Currently count function needs group by on key columns. This is a count(*) case.,"
+ + "Cannot apply this " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isCountOfOne()){
+ LOG.debug("Currently count function needs group by on key columns. This is a count(1) case.,"
+ + "Cannot apply this " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isAggFuncColsFetchException()){
+ LOG.debug("Got exception while locating child col refs " +
+ "of agg func, skipping " + getName() + " optimization.");
+ return false;
+ }
+ if (canApplyCtx.isWhrClauseColsFetchException()){
+ LOG.debug("Got exception while locating child col refs for where clause, "
+ + "skipping " + getName() + " optimization.");
return false;
}
- if (canApplyCtx.isAggFuncIsNotCount()) {
- LOG.debug("Agg func other than count is " + "not supported by " + getName()
- + " optimization.");
+ if (canApplyCtx.isSelClauseColsFetchException()){
+ LOG.debug("Got exception while locating child col refs for select list, "
+ + "skipping " + getName() + " optimization.");
return false;
}
- if (canApplyCtx.isAggParameterException()) {
- LOG.debug("Got exception while locating parameter refs for aggregation, " + "skipping "
- + getName() + " optimization.");
+ if (canApplyCtx.isGbyKeysFetchException()){
+ LOG.debug("Got exception while locating child col refs for GroupBy key, "
+ + "skipping " + getName() + " optimization.");
return false;
}
return true;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/index/RewriteQueryUsingAggregateIndexCtx.java Mon Dec 8 05:08:25 2014
@@ -19,46 +19,32 @@
package org.apache.hadoop.hive.ql.optimizer.index;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Stack;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.ColumnPrunerProcFactory;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
* RewriteQueryUsingAggregateIndexCtx class stores the
@@ -67,37 +53,37 @@ import org.apache.hadoop.hive.serde2.typ
*/
public final class RewriteQueryUsingAggregateIndexCtx implements NodeProcessorCtx {
- private static final Log LOG = LogFactory.getLog(RewriteQueryUsingAggregateIndexCtx.class.getName());
+
private RewriteQueryUsingAggregateIndexCtx(ParseContext parseContext, Hive hiveDb,
- RewriteCanApplyCtx canApplyCtx) {
+ String indexTableName, String alias, Set<String> columns, String aggregateFunction) {
this.parseContext = parseContext;
this.hiveDb = hiveDb;
- this.canApplyCtx = canApplyCtx;
- this.indexTableName = canApplyCtx.getIndexTableName();
- this.alias = canApplyCtx.getAlias();
- this.aggregateFunction = canApplyCtx.getAggFunction();
+ this.indexTableName = indexTableName;
+ this.alias = alias;
+ this.aggregateFunction = aggregateFunction;
+ this.columns = columns;
this.opc = parseContext.getOpParseCtx();
- this.indexKey = canApplyCtx.getIndexKey();
}
public static RewriteQueryUsingAggregateIndexCtx getInstance(ParseContext parseContext,
- Hive hiveDb, RewriteCanApplyCtx canApplyCtx) {
+ Hive hiveDb, String indexTableName, String alias,
+ Set<String> columns, String aggregateFunction) {
return new RewriteQueryUsingAggregateIndexCtx(
- parseContext, hiveDb, canApplyCtx);
+ parseContext, hiveDb, indexTableName, alias, columns, aggregateFunction);
}
+
private Map<Operator<? extends OperatorDesc>, OpParseContext> opc =
new LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>();
private final Hive hiveDb;
private final ParseContext parseContext;
- private RewriteCanApplyCtx canApplyCtx;
//We need the GenericUDAFEvaluator for GenericUDAF function "sum"
private GenericUDAFEvaluator eval = null;
private final String indexTableName;
private final String alias;
private final String aggregateFunction;
+ private final Set<String> columns;
private ExprNodeColumnDesc aggrExprNode = null;
- private String indexKey;
public Map<Operator<? extends OperatorDesc>, OpParseContext> getOpc() {
return opc;
@@ -131,6 +117,54 @@ public final class RewriteQueryUsingAggr
return aggrExprNode;
}
+ /**
+ * Walk the original operator tree using the {@link DefaultGraphWalker} using the rules.
+ * Each of the rules invoke respective methods from the {@link RewriteQueryUsingAggregateIndex}
+ * to rewrite the original query using aggregate index.
+ *
+ * @param topOp
+ * @throws SemanticException
+ */
+ public void invokeRewriteQueryProc(
+ Operator<? extends OperatorDesc> topOp) throws SemanticException{
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+ // replace scan operator containing original table with index table
+ opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + "%"),
+ RewriteQueryUsingAggregateIndex.getReplaceTableScanProc());
+ //rule that replaces index key selection with
+ //sum(`_count_of_indexed_column`) function in original query
+ opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + "%"),
+ RewriteQueryUsingAggregateIndex.getNewQuerySelectSchemaProc());
+ //Manipulates the ExprNodeDesc from GroupByOperator aggregation list
+ opRules.put(new RuleRegExp("R3", GroupByOperator.getOperatorName() + "%"),
+ RewriteQueryUsingAggregateIndex.getNewQueryGroupbySchemaProc());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, this);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.add(topOp);
+ ogw.startWalking(topNodes, null);
+ }
+
+ /**
+ * Default procedure for {@link DefaultRuleDispatcher}.
+ * @return
+ */
+ private NodeProcessor getDefaultProc() {
+ return new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ return null;
+ }
+ };
+ }
+
public String getAlias() {
return alias;
}
@@ -139,215 +173,7 @@ public final class RewriteQueryUsingAggr
return aggregateFunction;
}
- public String getIndexKey() {
- return indexKey;
- }
-
- public void setIndexKey(String indexKey) {
- this.indexKey = indexKey;
- }
-
- public void invokeRewriteQueryProc() throws SemanticException {
- this.replaceTableScanProcess(canApplyCtx.getTableScanOperator());
- //We need aggrExprNode. Thus, replaceGroupByOperatorProcess should come before replaceSelectOperatorProcess
- for (int index = 0; index < canApplyCtx.getGroupByOperators().size(); index++) {
- this.replaceGroupByOperatorProcess(canApplyCtx.getGroupByOperators().get(index), index);
- }
- for (SelectOperator selectperator : canApplyCtx.getSelectOperators()) {
- this.replaceSelectOperatorProcess(selectperator);
- }
- }
-
- /**
- * This method replaces the original TableScanOperator with the new
- * TableScanOperator and metadata that scans over the index table rather than
- * scanning over the original table.
- *
- */
- private void replaceTableScanProcess(TableScanOperator scanOperator) throws SemanticException {
- RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = this;
- String alias = rewriteQueryCtx.getAlias();
-
- // Need to remove the original TableScanOperators from these data structures
- // and add new ones
- Map<TableScanOperator, Table> topToTable = rewriteQueryCtx.getParseContext().getTopToTable();
- Map<String, Operator<? extends OperatorDesc>> topOps = rewriteQueryCtx.getParseContext()
- .getTopOps();
- Map<Operator<? extends OperatorDesc>, OpParseContext> opParseContext = rewriteQueryCtx
- .getParseContext().getOpParseCtx();
-
- // need this to set rowResolver for new scanOperator
- OpParseContext operatorContext = opParseContext.get(scanOperator);
-
- // remove original TableScanOperator
- topOps.remove(alias);
- topToTable.remove(scanOperator);
- opParseContext.remove(scanOperator);
-
- // construct a new descriptor for the index table scan
- TableScanDesc indexTableScanDesc = new TableScanDesc();
- indexTableScanDesc.setGatherStats(false);
-
- String indexTableName = rewriteQueryCtx.getIndexName();
- Table indexTableHandle = null;
- try {
- indexTableHandle = rewriteQueryCtx.getHiveDb().getTable(indexTableName);
- } catch (HiveException e) {
- LOG.error("Error while getting the table handle for index table.");
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
- }
-
- String k = indexTableName + Path.SEPARATOR;
- indexTableScanDesc.setStatsAggPrefix(k);
- scanOperator.setConf(indexTableScanDesc);
-
- // Construct the new RowResolver for the new TableScanOperator
- RowResolver rr = new RowResolver();
- try {
- StructObjectInspector rowObjectInspector = (StructObjectInspector) indexTableHandle
- .getDeserializer().getObjectInspector();
- StructField field = rowObjectInspector.getStructFieldRef(rewriteQueryCtx.getIndexKey());
- rr.put(indexTableName, field.getFieldName(), new ColumnInfo(field.getFieldName(),
- TypeInfoUtils.getTypeInfoFromObjectInspector(field.getFieldObjectInspector()),
- indexTableName, false));
- } catch (SerDeException e) {
- LOG.error("Error while creating the RowResolver for new TableScanOperator.");
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
- }
-
- // Set row resolver for new table
- operatorContext.setRowResolver(rr);
-
- String newAlias = indexTableName;
- int index = alias.lastIndexOf(":");
- if (index >= 0) {
- newAlias = alias.substring(0, index) + ":" + indexTableName;
- }
-
- // Scan operator now points to other table
- topToTable.put(scanOperator, indexTableHandle);
- scanOperator.getConf().setAlias(newAlias);
- scanOperator.setAlias(indexTableName);
- topOps.put(newAlias, scanOperator);
- opParseContext.put(scanOperator, operatorContext);
- rewriteQueryCtx.getParseContext().setTopToTable((HashMap<TableScanOperator, Table>) topToTable);
- rewriteQueryCtx.getParseContext().setTopOps(
- (HashMap<String, Operator<? extends OperatorDesc>>) topOps);
- rewriteQueryCtx.getParseContext().setOpParseCtx(
- (LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext>) opParseContext);
-
- ColumnPrunerProcFactory.setupNeededColumns(scanOperator, rr,
- Arrays.asList(rewriteQueryCtx.getIndexKey()));
- }
-
- /**
- * This method replaces the original SelectOperator with the new
- * SelectOperator with a new column indexed_key_column.
- */
- private void replaceSelectOperatorProcess(SelectOperator operator) throws SemanticException {
- RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = this;
- // we need to set the colList, outputColumnNames, colExprMap,
- // rowSchema for only that SelectOperator which precedes the GroupByOperator
- // count(indexed_key_column) needs to be replaced by
- // sum(`_count_of_indexed_key_column`)
- List<ExprNodeDesc> selColList = operator.getConf().getColList();
- selColList.add(rewriteQueryCtx.getAggrExprNode());
-
- List<String> selOutputColNames = operator.getConf().getOutputColumnNames();
- selOutputColNames.add(rewriteQueryCtx.getAggrExprNode().getColumn());
-
- operator.getColumnExprMap().put(rewriteQueryCtx.getAggrExprNode().getColumn(),
- rewriteQueryCtx.getAggrExprNode());
-
- RowSchema selRS = operator.getSchema();
- List<ColumnInfo> selRSSignature = selRS.getSignature();
- // Need to create a new type for Column[_count_of_indexed_key_column] node
- PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo("bigint");
- pti.setTypeName("bigint");
- ColumnInfo newCI = new ColumnInfo(rewriteQueryCtx.getAggregateFunction(), pti, "", false);
- selRSSignature.add(newCI);
- selRS.setSignature((ArrayList<ColumnInfo>) selRSSignature);
- operator.setSchema(selRS);
- }
-
- /**
- * We need to replace the count(indexed_column_key) GenericUDAF aggregation
- * function for group-by construct to "sum" GenericUDAF. This method creates a
- * new operator tree for a sample query that creates a GroupByOperator with
- * sum aggregation function and uses that GroupByOperator information to
- * replace the original GroupByOperator aggregation information. It replaces
- * the AggregationDesc (aggregation descriptor) of the old GroupByOperator
- * with the new Aggregation Desc of the new GroupByOperator.
- * @return
- */
- private void replaceGroupByOperatorProcess(GroupByOperator operator, int index)
- throws SemanticException {
- RewriteQueryUsingAggregateIndexCtx rewriteQueryCtx = this;
-
- // We need to replace the GroupByOperator which is before RS
- if (index == 0) {
- // the query contains the sum aggregation GenericUDAF
- String selReplacementCommand = "select sum(`" + rewriteQueryCtx.getAggregateFunction() + "`)"
- + " from " + rewriteQueryCtx.getIndexName() + " group by "
- + rewriteQueryCtx.getIndexKey() + " ";
- // create a new ParseContext for the query to retrieve its operator tree,
- // and the required GroupByOperator from it
- ParseContext newDAGContext = RewriteParseContextGenerator.generateOperatorTree(
- rewriteQueryCtx.getParseContext().getConf(), selReplacementCommand);
-
- // we get our new GroupByOperator here
- Map<GroupByOperator, Set<String>> newGbyOpMap = newDAGContext.getGroupOpToInputTables();
- GroupByOperator newGbyOperator = newGbyOpMap.keySet().iterator().next();
- GroupByDesc oldConf = operator.getConf();
-
- // we need this information to set the correct colList, outputColumnNames
- // in SelectOperator
- ExprNodeColumnDesc aggrExprNode = null;
-
- // Construct the new AggregationDesc to get rid of the current
- // internal names and replace them with new internal names
- // as required by the operator tree
- GroupByDesc newConf = newGbyOperator.getConf();
- List<AggregationDesc> newAggrList = newConf.getAggregators();
- if (newAggrList != null && newAggrList.size() > 0) {
- for (AggregationDesc aggregationDesc : newAggrList) {
- rewriteQueryCtx.setEval(aggregationDesc.getGenericUDAFEvaluator());
- aggrExprNode = (ExprNodeColumnDesc) aggregationDesc.getParameters().get(0);
- rewriteQueryCtx.setAggrExprNode(aggrExprNode);
- }
- }
-
- // Now the GroupByOperator has the new AggregationList;
- // sum(`_count_of_indexed_key`)
- // instead of count(indexed_key)
- OpParseContext gbyOPC = rewriteQueryCtx.getOpc().get(operator);
- RowResolver gbyRR = newDAGContext.getOpParseCtx().get(newGbyOperator).getRowResolver();
- gbyOPC.setRowResolver(gbyRR);
- rewriteQueryCtx.getOpc().put(operator, gbyOPC);
-
- oldConf.setAggregators((ArrayList<AggregationDesc>) newAggrList);
- operator.setConf(oldConf);
-
- } else {
- // we just need to reset the GenericUDAFEvaluator and its name for this
- // GroupByOperator whose parent is the ReduceSinkOperator
- GroupByDesc childConf = (GroupByDesc) operator.getConf();
- List<AggregationDesc> childAggrList = childConf.getAggregators();
- if (childAggrList != null && childAggrList.size() > 0) {
- for (AggregationDesc aggregationDesc : childAggrList) {
- List<ExprNodeDesc> paraList = aggregationDesc.getParameters();
- List<ObjectInspector> parametersOIList = new ArrayList<ObjectInspector>();
- for (ExprNodeDesc expr : paraList) {
- parametersOIList.add(expr.getWritableObjectInspector());
- }
- GenericUDAFEvaluator evaluator = FunctionRegistry.getGenericUDAFEvaluator("sum",
- parametersOIList, false, false);
- aggregationDesc.setGenericUDAFEvaluator(evaluator);
- aggregationDesc.setGenericUDAFName("sum");
- }
- }
- }
+ public Set<String> getColumns() {
+ return columns;
}
}
Modified: hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out?rev=1643736&r1=1643735&r2=1643736&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out Mon Dec 8 05:08:25 2014
@@ -1189,14 +1189,14 @@ STAGE PLANS:
Map Reduce
Map Operator Tree:
TableScan
- alias: default.default__tbl_tbl_key_idx__
+ alias: tbl
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Select Operator
- expressions: key (type: int), _count_of_key (type: bigint)
- outputColumnNames: key, _count_of_key
+ expressions: key (type: int)
+ outputColumnNames: key
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
Group By Operator
- aggregations: sum(_count_of_key)
+ aggregations: count(key)
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
@@ -1206,7 +1206,7 @@ STAGE PLANS:
value expressions: _col0 (type: bigint)
Reduce Operator Tree:
Group By Operator
- aggregations: sum(VALUE._col0)
+ aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE