You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/05 19:26:48 UTC
svn commit: r1643380 - in /hive/branches/spark/ql/src:
java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
test/results/clientpositive/spark/parquet_join.q.out
Author: xuefu
Date: Fri Dec 5 18:26:47 2014
New Revision: 1643380
URL: http://svn.apache.org/r1643380
Log:
HIVE-8992: Fix bucket related test failure: parquet_join.q [Spark Branch] (Jimmy via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1643380&r1=1643379&r2=1643380&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Fri Dec 5 18:26:47 2014
@@ -23,6 +23,8 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -78,6 +81,18 @@ public class SetSparkReducerParallelism
LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
desc.setNumReducers(constantReducers);
} else {
+ //If it's a FileSink to bucketed files, use the bucket count as the reducer number
+ FileSinkOperator fso = GenSparkUtils.getChildOperator(sink, FileSinkOperator.class);
+ if (fso != null) {
+ String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(
+ hive_metastoreConstants.BUCKET_COUNT);
+ int numBuckets = bucketCount == null ? 0 : Integer.parseInt(bucketCount);
+ if (numBuckets > 0) {
+ LOG.info("Set parallelism for reduce sink " + sink + " to: " + numBuckets);
+ desc.setNumReducers(numBuckets);
+ return false;
+ }
+ }
try {
long numberOfBytes = 0;
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out?rev=1643380&r1=1643379&r2=1643380&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out Fri Dec 5 18:26:47 2014
@@ -56,25 +56,30 @@ POSTHOOK: type: CREATETABLE_AS_SELECT
POSTHOOK: Input: default@staging
POSTHOOK: Output: database:default
POSTHOOK: Output: default@parquet_jointable2
-PREHOOK: query: -- MR join
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+-- MR join
explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key
PREHOOK: type: QUERY
-POSTHOOK: query: -- MR join
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+-- MR join
explain select p2.myvalue from parquet_jointable1 p1 join parquet_jointable2 p2 on p1.key=p2.key
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
- Stage-2 is a root stage
- Stage-1 depends on stages: Stage-2
+ Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
- Stage: Stage-2
+ Stage: Stage-1
Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 3 (PARTITION-LEVEL SORT, 3)
#### A masked pattern was here ####
Vertices:
- Map 2
+ Map 1
Map Operator Tree:
TableScan
alias: p1
@@ -82,21 +87,12 @@ STAGE PLANS:
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
- Spark HashTable Sink Operator
- condition expressions:
- 0
- 1 {myvalue}
- keys:
- 0 key (type: int)
- 1 key (type: int)
- Local Work:
- Map Reduce Local Work
-
- Stage: Stage-1
- Spark
-#### A masked pattern was here ####
- Vertices:
- Map 1
+ Reduce Output Operator
+ key expressions: key (type: int)
+ sort order: +
+ Map-reduce partition columns: key (type: int)
+ Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+ Map 3
Map Operator Tree:
TableScan
alias: p2
@@ -104,32 +100,33 @@ STAGE PLANS:
Filter Operator
predicate: key is not null (type: boolean)
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- Map Join Operator
- condition map:
- Inner Join 0 to 1
- condition expressions:
- 0
- 1 {myvalue}
- keys:
- 0 key (type: int)
- 1 key (type: int)
- outputColumnNames: _col7
- input vertices:
- 0 Map 2
- Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: _col7 (type: string)
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Local Work:
- Map Reduce Local Work
+ Reduce Output Operator
+ key expressions: key (type: int)
+ sort order: +
+ Map-reduce partition columns: key (type: int)
+ Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
+ value expressions: myvalue (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {VALUE._col1}
+ outputColumnNames: _col7
+ Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col7 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
@@ -175,7 +172,7 @@ STAGE PLANS:
Spark
#### A masked pattern was here ####
Vertices:
- Map 2
+ Map 1
Map Operator Tree:
TableScan
alias: p1
@@ -197,7 +194,7 @@ STAGE PLANS:
Spark
#### A masked pattern was here ####
Vertices:
- Map 1
+ Map 2
Map Operator Tree:
TableScan
alias: p2
@@ -216,7 +213,7 @@ STAGE PLANS:
1 key (type: int)
outputColumnNames: _col7
input vertices:
- 0 Map 2
+ 0 Map 1
Statistics: Num rows: 1 Data size: 2 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col7 (type: string)