You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/05 19:26:48 UTC

svn commit: r1643380 - in /hive/branches/spark/ql/src: java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java test/results/clientpositive/spark/parquet_join.q.out

Author: xuefu
Date: Fri Dec  5 18:26:47 2014
New Revision: 1643380

URL: http://svn.apache.org/r1643380
Log:
HIVE-8992: Fix bucket related test failure: parquet_join.q [Spark Branch] (Jimmy via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1643380&r1=1643379&r2=1643380&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Fri Dec  5 18:26:47 2014
@@ -23,6 +23,8 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -78,6 +81,18 @@ public class SetSparkReducerParallelism
         LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
         desc.setNumReducers(constantReducers);
       } else {
+        //If it's a FileSink to bucketed files, use the bucket count as the reducer number
+        FileSinkOperator fso = GenSparkUtils.getChildOperator(sink, FileSinkOperator.class);
+        if (fso != null) {
+          String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(
+            hive_metastoreConstants.BUCKET_COUNT);
+          int numBuckets = bucketCount == null ? 0 : Integer.parseInt(bucketCount);
+          if (numBuckets > 0) {
+            LOG.info("Set parallelism for reduce sink " + sink + " to: " + numBuckets);
+            desc.setNumReducers(numBuckets);
+            return false;
+          }
+        }
         try {
           long numberOfBytes = 0;
 

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out?rev=1643380&r1=1643379&r2=1643380&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out Fri Dec  5 18:26:47 2014
@@ -56,25 +56,30 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
 POSTHOOK: Input: default@staging
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@parquet_jointable2
-PREHOOK: query: -- MR join
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+-- MR join
 
 explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key
 PREHOOK: type: QUERY
-POSTHOOK: query: -- MR join
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+-- MR join
 
 explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
-  Stage-2 is a root stage
-  Stage-1 depends on stages: Stage-2
+  Stage-1 is a root stage
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
-  Stage: Stage-2
+  Stage: Stage-1
     Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 3 (PARTITION-LEVEL SORT, 3)
 #### A masked pattern was here ####
       Vertices:
-        Map 2 
+        Map 1 
             Map Operator Tree:
                 TableScan
                   alias: p1
@@ -82,21 +87,12 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
-                    Spark HashTable Sink Operator
-                      condition expressions:
-                        0 
-                        1 {myvalue}
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-            Local Work:
-              Map Reduce Local Work
-
-  Stage: Stage-1
-    Spark
-#### A masked pattern was here ####
-      Vertices:
-        Map 1 
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: p2
@@ -104,32 +100,33 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      condition expressions:
-                        0 
-                        1 {myvalue}
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col7
-                      input vertices:
-                        0 Map 2
-                      Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
-                      Select Operator
-                        expressions: _col7 (type: string)
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-            Local Work:
-              Map Reduce Local Work
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: myvalue (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                condition expressions:
+                  0 
+                  1 {VALUE._col1}
+                outputColumnNames: _col7
+                Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col7 (type: string)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -175,7 +172,7 @@ STAGE PLANS:
     Spark
 #### A masked pattern was here ####
       Vertices:
-        Map 2 
+        Map 1 
             Map Operator Tree:
                 TableScan
                   alias: p1
@@ -197,7 +194,7 @@ STAGE PLANS:
     Spark
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: p2
@@ -216,7 +213,7 @@ STAGE PLANS:
                         1 key (type: int)
                       outputColumnNames: _col7
                       input vertices:
-                        0 Map 2
+                        0 Map 1
                       Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
                       Select Operator
                         expressions: _col7 (type: string)