You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2011/12/09 18:58:22 UTC

svn commit: r1212551 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java test/queries/clientpositive/reduce_deduplicate_exclude_gby.q test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out

Author: namit
Date: Fri Dec  9 17:58:22 2011
New Revision: 1212551

URL: http://svn.apache.org/viewvc?rev=1212551&view=rev
Log:
HIVE-2329 Not using map aggregation, fails to execute group-by after
cluster-by with same key (Navis via namit)


Added:
    hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q
    hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=1212551&r1=1212550&r2=1212551&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Fri Dec  9 17:58:22 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.Co
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -64,11 +65,11 @@ import org.apache.hadoop.hive.ql.plan.Se
 public class ReduceSinkDeDuplication implements Transform{
 
   protected ParseContext pGraphContext;
-  
+
   @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     pGraphContext = pctx;
-    
+
  // generate pruned column list for all relevant operators
     ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
 
@@ -88,7 +89,7 @@ public class ReduceSinkDeDuplication imp
     ogw.startWalking(topNodes, null);
     return pGraphContext;
   }
-  
+
   class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
     ParseContext pctx;
     List<ReduceSinkOperator> rejectedRSList;
@@ -97,11 +98,11 @@ public class ReduceSinkDeDuplication imp
       rejectedRSList = new ArrayList<ReduceSinkOperator>();
       this.pctx = pctx;
     }
-    
+
     public boolean contains (ReduceSinkOperator rsOp) {
       return rejectedRSList.contains(rsOp);
     }
-    
+
     public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
       if (!rejectedRSList.contains(rsOp)) {
         rejectedRSList.add(rsOp);
@@ -116,10 +117,10 @@ public class ReduceSinkDeDuplication imp
       this.pctx = pctx;
     }
   }
-  
-  
+
+
   static class ReduceSinkDeduplicateProcFactory {
-    
+
 
     public static NodeProcessor getReducerReducerProc() {
       return new ReducerReducerProc();
@@ -140,7 +141,7 @@ public class ReduceSinkDeDuplication imp
         return null;
       }
     }
-    
+
     static class ReducerReducerProc implements NodeProcessor {
       @Override
       public Object process(Node nd, Stack<Node> stack,
@@ -148,11 +149,17 @@ public class ReduceSinkDeDuplication imp
           throws SemanticException {
         ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
         ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
-        
+
         if(ctx.contains(childReduceSink)) {
           return null;
         }
-        
+
+        List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
+        if (childOp != null && childOp.size() == 1 && childOp.get(0) instanceof GroupByOperator) {
+          ctx.addRejectedReduceSinkOperator(childReduceSink);
+          return null;
+        }
+
         ParseContext pGraphContext = ctx.getPctx();
         HashMap<String, String> childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink);
         ReduceSinkOperator parentRS = null;
@@ -171,7 +178,7 @@ public class ReduceSinkDeDuplication imp
         } else {
           stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
         }
-        
+
         boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
         if (!succeed) {
           return null;
@@ -180,7 +187,7 @@ public class ReduceSinkDeDuplication imp
         if (!succeed) {
           return null;
         }
-        
+
         boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
         if (!same) {
           return null;
@@ -193,18 +200,18 @@ public class ReduceSinkDeDuplication imp
           ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
         List<Operator<? extends Serializable>> parentOp = childReduceSink.getParentOperators();
         List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
-        
+
         Operator<? extends Serializable> oldParent = childReduceSink;
-        
+
         if (childOp != null && childOp.size() == 1
             && ((childOp.get(0)) instanceof ExtractOperator)) {
           oldParent = childOp.get(0);
           childOp = childOp.get(0).getChildOperators();
         }
-        
+
         Operator<? extends Serializable> input = parentOp.get(0);
         input.getChildOperators().clear();
-        
+
         RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRowResolver();
 
         ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
@@ -239,9 +246,9 @@ public class ReduceSinkDeDuplication imp
         for (Operator<? extends Serializable> ch : childOp) {
           ch.replaceParent(oldParent, sel);
         }
-        
+
       }
-      
+
       private Operator<? extends Serializable> putOpInsertMap(
           Operator<? extends Serializable> op, RowResolver rr, ParseContext pGraphContext) {
         OpParseContext ctx = new OpParseContext(rr);
@@ -253,16 +260,16 @@ public class ReduceSinkDeDuplication imp
           ReduceSinkOperator parentRS,
           HashMap<String, String> childColumnMapping,
           HashMap<String, String> parentColumnMapping) {
-        
+
         ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
         ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
-        
+
         boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
             childPartitionCols, parentPartitionCols);
         if (!ret) {
           return false;
         }
-        
+
         ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
         ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
         ret = compareExprNodes(childColumnMapping, parentColumnMapping,
@@ -270,7 +277,7 @@ public class ReduceSinkDeDuplication imp
         if (!ret) {
           return false;
         }
-        
+
         String childRSOrder = childReduceSink.getConf().getOrder();
         String parentRSOrder = parentRS.getConf().getOrder();
         boolean moveChildRSOrderToParent = false;
@@ -285,14 +292,14 @@ public class ReduceSinkDeDuplication imp
             moveChildRSOrderToParent = true;
           }
         }
-        
+
         int childNumReducers = childReduceSink.getConf().getNumReducers();
         int parentNumReducers = parentRS.getConf().getNumReducers();
         boolean moveChildReducerNumToParent = false;
         //move child reduce sink's number reducers to the parent reduce sink operator.
         if (childNumReducers != parentNumReducers) {
           if (childNumReducers == -1) {
-            //do nothing. 
+            //do nothing.
           } else if (parentNumReducers == -1) {
             //set childNumReducers in the parent reduce sink operator.
             moveChildReducerNumToParent = true;
@@ -300,15 +307,15 @@ public class ReduceSinkDeDuplication imp
             return false;
           }
         }
-        
+
         if(moveChildRSOrderToParent) {
-          parentRS.getConf().setOrder(childRSOrder);          
+          parentRS.getConf().setOrder(childRSOrder);
         }
-        
+
         if(moveChildReducerNumToParent) {
           parentRS.getConf().setNumReducers(childNumReducers);
         }
-        
+
         return true;
       }
 
@@ -316,14 +323,14 @@ public class ReduceSinkDeDuplication imp
           HashMap<String, String> parentColumnMapping,
           ArrayList<ExprNodeDesc> childColExprs,
           ArrayList<ExprNodeDesc> parentColExprs) {
-        
+
         boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
         boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
-        
+
         if (childEmpty) { //both empty
           return true;
         }
-        
+
         //child not empty here
         if (parentEmpty) { // child not empty, but parent empty
           return false;
@@ -383,13 +390,13 @@ public class ReduceSinkDeDuplication imp
             }
           }
         }
-        
+
         return true;
       }
 
       private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
         HashMap<String, String> columnMapping = new HashMap<String, String> ();
-        ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();        
+        ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
         ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
         ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
         if(partitionCols != null) {
@@ -419,7 +426,7 @@ public class ReduceSinkDeDuplication imp
             // this potentially is a join operator
             return null;
           }
-          
+
           boolean allowed = false;
           if ((start instanceof SelectOperator)
               || (start instanceof FilterOperator)
@@ -429,17 +436,17 @@ public class ReduceSinkDeDuplication imp
               || (start instanceof ReduceSinkOperator)) {
             allowed = true;
           }
-          
+
           if (!allowed) {
             return null;
           }
-          
+
           if ((start instanceof ScriptOperator)
               && !HiveConf.getBoolVar(pGraphContext.getConf(),
                   HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
             return null;
           }
-          
+
           start = start.getParentOperators().get(0);
           if(start instanceof ReduceSinkOperator) {
             return (ReduceSinkOperator)start;
@@ -448,6 +455,6 @@ public class ReduceSinkDeDuplication imp
         return null;
       }
     }
-    
+
   }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q?rev=1212551&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate_exclude_gby.q Fri Dec  9 17:58:22 2011
@@ -0,0 +1,6 @@
+create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string);
+
+set hive.map.aggr=false;
+select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1;
+
+drop table t1;

Added: hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out?rev=1212551&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate_exclude_gby.q.out Fri Dec  9 17:58:22 2011
@@ -0,0 +1,21 @@
+PREHOOK: query: create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table t1( key_int1 int, key_int2 int, key_string1 string, key_string2 string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@t1
+PREHOOK: query: select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+PREHOOK: Output: file:/tmp/navis/hive_2011-11-27_23-31-58_414_7329222711647021730/-mr-10000
+POSTHOOK: query: select Q1.key_int1, sum(Q1.key_int1) from (select * from t1 cluster by key_int1) Q1 group by Q1.key_int1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: file:/tmp/navis/hive_2011-11-27_23-31-58_414_7329222711647021730/-mr-10000
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1