You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/12/09 18:58:22 UTC
svn commit: r1212551 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
test/queries/clientpositive/reduce_deduplicate_exclude_gby.q
test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out
Author: namit
Date: Fri Dec 9 17:58:22 2011
New Revision: 1212551
URL: http://svn.apache.org/viewvc?rev=1212551&view=rev
Log:
HIVE-2329 Not using map aggregation, fails to execute group-by after
cluster-by with same key (Navis via namit)
Added:
hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q
hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1212551&r1=1212550&r2=1212551&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Fri Dec 9 17:58:22 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Co
import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -64,11 +65,11 @@ import org.apache.hadoop.hive.ql.plan.Se
public class ReduceSinkDeDuplication implements Transform{
protected ParseContext pGraphContext;
-
+
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
pGraphContext = pctx;
-
+
// generate pruned column list for all relevant operators
ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
@@ -88,7 +89,7 @@ public class ReduceSinkDeDuplication imp
ogw.startWalking(topNodes, null);
return pGraphContext;
}
-
+
class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
ParseContext pctx;
List<ReduceSinkOperator> rejectedRSList;
@@ -97,11 +98,11 @@ public class ReduceSinkDeDuplication imp
rejectedRSList = new ArrayList<ReduceSinkOperator>();
this.pctx = pctx;
}
-
+
public boolean contains (ReduceSinkOperator rsOp) {
return rejectedRSList.contains(rsOp);
}
-
+
public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
if (!rejectedRSList.contains(rsOp)) {
rejectedRSList.add(rsOp);
@@ -116,10 +117,10 @@ public class ReduceSinkDeDuplication imp
this.pctx = pctx;
}
}
-
-
+
+
static class ReduceSinkDeduplicateProcFactory {
-
+
public static NodeProcessor getReducerReducerProc() {
return new ReducerReducerProc();
@@ -140,7 +141,7 @@ public class ReduceSinkDeDuplication imp
return null;
}
}
-
+
static class ReducerReducerProc implements NodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack,
@@ -148,11 +149,17 @@ public class ReduceSinkDeDuplication imp
throws SemanticException {
ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
-
+
if(ctx.contains(childReduceSink)) {
return null;
}
-
+
+ List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
+ if (childOp != null && childOp.size() == 1 && childOp.get(0) instanceof GroupByOperator) {
+ ctx.addRejectedReduceSinkOperator(childReduceSink);
+ return null;
+ }
+
ParseContext pGraphContext = ctx.getPctx();
HashMap<String, String> childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink);
ReduceSinkOperator parentRS = null;
@@ -171,7 +178,7 @@ public class ReduceSinkDeDuplication imp
} else {
stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
}
-
+
boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
if (!succeed) {
return null;
@@ -180,7 +187,7 @@ public class ReduceSinkDeDuplication imp
if (!succeed) {
return null;
}
-
+
boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
if (!same) {
return null;
@@ -193,18 +200,18 @@ public class ReduceSinkDeDuplication imp
ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
List<Operator<? extends Serializable>> parentOp = childReduceSink.getParentOperators();
List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
-
+
Operator<? extends Serializable> oldParent = childReduceSink;
-
+
if (childOp != null && childOp.size() == 1
&& ((childOp.get(0)) instanceof ExtractOperator)) {
oldParent = childOp.get(0);
childOp = childOp.get(0).getChildOperators();
}
-
+
Operator<? extends Serializable> input = parentOp.get(0);
input.getChildOperators().clear();
-
+
RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
@@ -239,9 +246,9 @@ public class ReduceSinkDeDuplication imp
for (Operator<? extends Serializable> ch : childOp) {
ch.replaceParent(oldParent, sel);
}
-
+
}
-
+
private Operator<? extends Serializable> putOpInsertMap(
Operator<? extends Serializable> op, RowResolver rr, ParseContext pGraphContext) {
OpParseContext ctx = new OpParseContext(rr);
@@ -253,16 +260,16 @@ public class ReduceSinkDeDuplication imp
ReduceSinkOperator parentRS,
HashMap<String, String> childColumnMapping,
HashMap<String, String> parentColumnMapping) {
-
+
ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
-
+
boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
childPartitionCols, parentPartitionCols);
if (!ret) {
return false;
}
-
+
ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
ret = compareExprNodes(childColumnMapping, parentColumnMapping,
@@ -270,7 +277,7 @@ public class ReduceSinkDeDuplication imp
if (!ret) {
return false;
}
-
+
String childRSOrder = childReduceSink.getConf().getOrder();
String parentRSOrder = parentRS.getConf().getOrder();
boolean moveChildRSOrderToParent = false;
@@ -285,14 +292,14 @@ public class ReduceSinkDeDuplication imp
moveChildRSOrderToParent = true;
}
}
-
+
int childNumReducers = childReduceSink.getConf().getNumReducers();
int parentNumReducers = parentRS.getConf().getNumReducers();
boolean moveChildReducerNumToParent = false;
//move child reduce sink's number reducers to the parent reduce sink operator.
if (childNumReducers != parentNumReducers) {
if (childNumReducers == -1) {
- //do nothing.
+ //do nothing.
} else if (parentNumReducers == -1) {
//set childNumReducers in the parent reduce sink operator.
moveChildReducerNumToParent = true;
@@ -300,15 +307,15 @@ public class ReduceSinkDeDuplication imp
return false;
}
}
-
+
if(moveChildRSOrderToParent) {
- parentRS.getConf().setOrder(childRSOrder);
+ parentRS.getConf().setOrder(childRSOrder);
}
-
+
if(moveChildReducerNumToParent) {
parentRS.getConf().setNumReducers(childNumReducers);
}
-
+
return true;
}
@@ -316,14 +323,14 @@ public class ReduceSinkDeDuplication imp
HashMap<String, String> parentColumnMapping,
ArrayList<ExprNodeDesc> childColExprs,
ArrayList<ExprNodeDesc> parentColExprs) {
-
+
boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
-
+
if (childEmpty) { //both empty
return true;
}
-
+
//child not empty here
if (parentEmpty) { // child not empty, but parent empty
return false;
@@ -383,13 +390,13 @@ public class ReduceSinkDeDuplication imp
}
}
}
-
+
return true;
}
private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
HashMap<String, String> columnMapping = new HashMap<String, String> ();
- ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
+ ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
if(partitionCols != null) {
@@ -419,7 +426,7 @@ public class ReduceSinkDeDuplication imp
// this potentially is a join operator
return null;
}
-
+
boolean allowed = false;
if ((start instanceof SelectOperator)
|| (start instanceof FilterOperator)
@@ -429,17 +436,17 @@ public class ReduceSinkDeDuplication imp
|| (start instanceof ReduceSinkOperator)) {
allowed = true;
}
-
+
if (!allowed) {
return null;
}
-
+
if ((start instanceof ScriptOperator)
&& !HiveConf.getBoolVar(pGraphContext.getConf(),
HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
return null;
}
-
+
start = start.getParentOperators().get(0);
if(start instanceof ReduceSinkOperator) {
return (ReduceSinkOperator)start;
@@ -448,6 +455,6 @@ public class ReduceSinkDeDuplication imp
return null;
}
}
-
+
}
}
Added: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q?rev=1212551&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q Fri Dec 9 17:58:22 2011
@@ -0,0 +1,6 @@
+create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string);
+
+set hive.map.aggr=false;
+select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1;
+
+drop table t1;
Added: hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out?rev=1212551&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out Fri Dec 9 17:58:22 2011
@@ -0,0 +1,21 @@
+PREHOOK: query: create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@t1
+PREHOOK: query: select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Output: file:/tmp/navis/hive_2011-11-27_23-31-58_414_7329222711647021730/-mr-10000
+POSTHOOK: query: select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: file:/tmp/navis/hive_2011-11-27_23-31-58_414_7329222711647021730/-mr-10000
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1