You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/02 21:32:54 UTC
[1/3] hive git commit: HIVE-15323 : allow the user to turn off
reduce-side SMB join (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/master 98a25f2d8 -> 2feaa5dc9
HIVE-15323 : allow the user to turn off reduce-side SMB join (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/12130c3e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/12130c3e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/12130c3e
Branch: refs/heads/master
Commit: 12130c3ee71c2499d8224624a0e4cc7109727579
Parents: 2f9728e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 2 13:22:20 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 2 13:32:27 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 21 ++-
.../annotation/OpTraitsRulesProcFactory.java | 38 ++++-
.../optimizer/spark/SparkMapJoinOptimizer.java | 2 +-
.../apache/hadoop/hive/ql/plan/OpTraits.java | 15 +-
ql/src/test/queries/clientpositive/tez_smb_1.q | 20 +++
.../results/clientpositive/llap/tez_smb_1.q.out | 161 +++++++++++++++++++
7 files changed, 241 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b809562..9064e49 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1506,6 +1506,8 @@ public class HiveConf extends Configuration {
HIVE_AUTO_SORTMERGE_JOIN("hive.auto.convert.sortmerge.join", false,
"Will the join be automatically converted to a sort-merge join, if the joined tables pass the criteria for sort-merge join."),
+ HIVE_AUTO_SORTMERGE_JOIN_REDUCE("hive.auto.convert.sortmerge.join.reduce.side", true,
+ "Whether hive.auto.convert.sortmerge.join (if enabled) should be applied to reduce side."),
HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR(
"hive.auto.convert.sortmerge.join.bigtable.selection.policy",
"org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ",
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 2b93e01..7441f1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -29,6 +29,7 @@ import java.util.Stack;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
@@ -147,7 +148,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true);
// map join operator by default has no bucket cols and num of reduce sinks
// reduced by 1
- mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+ mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks()));
mapJoinOp.setStatistics(joinOp.getStatistics());
// propagate this change till the next RS
for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
@@ -162,7 +163,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
// we cannot convert to bucket map join, we cannot convert to
// map join either based on the size. Check if we can convert to SMB join.
- if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+ if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
+ || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
+ && joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
fallbackToReduceSideJoin(joinOp, context);
return null;
}
@@ -236,9 +239,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
(CommonMergeJoinOperator) OperatorFactory.get(joinOp.getCompilationOpContext(),
new CommonMergeJoinDesc(numBuckets, mapJoinConversionPos, mapJoinDesc),
joinOp.getSchema());
- OpTraits opTraits =
- new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
- .getSortCols());
+ int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks();
+ OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets,
+ joinOp.getOpTraits().getSortCols(), numReduceSinks);
mergeJoinOp.setOpTraits(opTraits);
mergeJoinOp.setStatistics(joinOp.getStatistics());
@@ -304,7 +307,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
if (currentOp instanceof ReduceSinkOperator) {
return;
}
- currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), opTraits.getNumBuckets(), opTraits.getSortCols()));
+ currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(),
+ opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks()));
for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
break;
@@ -331,7 +335,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// we can set the traits for this join operator
OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
- tezBucketJoinProcCtx.getNumBuckets(), null);
+ tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks());
mapJoinOp.setOpTraits(opTraits);
mapJoinOp.setStatistics(joinOp.getStatistics());
setNumberOfBucketsOnChildren(mapJoinOp);
@@ -851,7 +855,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
OpTraits opTraits = new OpTraits(
joinOp.getOpTraits().getBucketColNames(),
numReducers,
- null);
+ null,
+ joinOp.getOpTraits().getNumReduceSinks());
mapJoinOp.setOpTraits(opTraits);
mapJoinOp.setStatistics(joinOp.getStatistics());
// propagate this change till the next RS
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
index 1e89016..875ee9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
@@ -106,11 +106,13 @@ public class OpTraitsRulesProcFactory {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
listBucketCols.add(bucketCols);
int numBuckets = -1;
+ int numReduceSinks = 1;
OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getTraits();
if (parentOpTraits != null) {
numBuckets = parentOpTraits.getNumBuckets();
+ numReduceSinks += parentOpTraits.getNumReduceSinks();
}
- OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks);
rs.setOpTraits(opTraits);
return null;
}
@@ -135,8 +137,8 @@ public class OpTraitsRulesProcFactory {
// construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
if (!partitions.isEmpty()) {
for (Partition p : partitions) {
- List<String> fileNames =
- AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(),
+ List<String> fileNames =
+ AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(),
pGraphContext);
// The number of files for the table should be same as number of
// buckets.
@@ -187,7 +189,7 @@ public class OpTraitsRulesProcFactory {
sortedColsList.add(sortCols);
}
// num reduce sinks hardcoded to 0 because TS has no parents
- OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
+ OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0);
ts.setOpTraits(opTraits);
return null;
}
@@ -212,8 +214,13 @@ public class OpTraitsRulesProcFactory {
}
List<List<String>> listBucketCols = new ArrayList<List<String>>();
+ int numReduceSinks = 0;
+ OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits();
+ if (parentOpTraits != null) {
+ numReduceSinks = parentOpTraits.getNumReduceSinks();
+ }
listBucketCols.add(gbyKeys);
- OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
+ OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks);
gbyOp.setOpTraits(opTraits);
return null;
}
@@ -266,11 +273,13 @@ public class OpTraitsRulesProcFactory {
}
int numBuckets = -1;
+ int numReduceSinks = 0;
OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits();
if (parentOpTraits != null) {
numBuckets = parentOpTraits.getNumBuckets();
+ numReduceSinks = parentOpTraits.getNumReduceSinks();
}
- OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks);
selOp.setOpTraits(opTraits);
return null;
}
@@ -299,10 +308,13 @@ public class OpTraitsRulesProcFactory {
OpTraits parentOpTraits = rsOp.getOpTraits();
bucketColsList.add(getOutputColNames(joinOp, parentOpTraits.getBucketColNames(), pos));
sortColsList.add(getOutputColNames(joinOp, parentOpTraits.getSortCols(), pos));
+ if (parentOpTraits.getNumReduceSinks() > numReduceSinks) {
+ numReduceSinks = parentOpTraits.getNumReduceSinks();
+ }
pos++;
}
- joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
+ joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks));
return null;
}
@@ -355,7 +367,17 @@ public class OpTraitsRulesProcFactory {
Object... nodeOutputs) throws SemanticException {
@SuppressWarnings("unchecked")
Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd;
- OpTraits opTraits = new OpTraits(null, -1, null);
+
+ int numReduceSinks = 0;
+ for (Operator<?> parentOp : operator.getParentOperators()) {
+ if (parentOp.getOpTraits() == null) {
+ continue;
+ }
+ if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) {
+ numReduceSinks = parentOp.getOpTraits().getNumReduceSinks();
+ }
+ }
+ OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks);
operator.setOpTraits(opTraits);
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 005fad2..7faff88 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -113,7 +113,7 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
}
// we can set the traits for this join operator
- OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null);
+ OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks());
mapJoinOp.setOpTraits(opTraits);
mapJoinOp.setStatistics(joinOp.getStatistics());
setNumberOfBucketsOnChildren(mapJoinOp);
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
index 1c76586..ff8cfbf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
@@ -25,11 +25,14 @@ public class OpTraits {
List<List<String>> bucketColNames;
List<List<String>> sortColNames;
int numBuckets;
+ int numReduceSinks;
- public OpTraits(List<List<String>> bucketColNames, int numBuckets, List<List<String>> sortColNames) {
+ public OpTraits(List<List<String>> bucketColNames, int numBuckets,
+ List<List<String>> sortColNames, int numReduceSinks) {
this.bucketColNames = bucketColNames;
this.numBuckets = numBuckets;
this.sortColNames = sortColNames;
+ this.numReduceSinks = numReduceSinks;
}
public List<List<String>> getBucketColNames() {
@@ -55,6 +58,16 @@ public class OpTraits {
public List<List<String>> getSortCols() {
return sortColNames;
}
+
+
+ public void setNumReduceSinks(int numReduceSinks) {
+ this.numReduceSinks = numReduceSinks;
+ }
+
+ public int getNumReduceSinks() {
+ return this.numReduceSinks;
+ }
+
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/test/queries/clientpositive/tez_smb_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_1.q b/ql/src/test/queries/clientpositive/tez_smb_1.q
index 6862a9a..4e8d1c0 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_1.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_1.q
@@ -87,3 +87,23 @@ join
(select rt2.id from
(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
where vt1.id=vt2.id;
+
+set hive.auto.convert.sortmerge.join.reduce.side=false;
+
+explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out b/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
index 4782c18..94e519e 100644
--- a/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
@@ -622,3 +622,164 @@ POSTHOOK: Input: default@tab_part
POSTHOOK: Input: default@tab_part@ds=2008-04-08
#### A masked pattern was here ####
480
+PREHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+ Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
+ Reducer 6 <- Map 5 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: llap
+ LLAP IO: no inputs
+ Map 5
+ Map Operator Tree:
+ TableScan
+ alias: t2
+ Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+ Execution mode: llap
+ LLAP IO: no inputs
+ Reducer 2
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+ Reducer 3
+ Execution mode: llap
+ Reduce Operator Tree:
+ Merge Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 _col0 (type: int)
+ Statistics: Num rows: 550 Data size: 10243 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 4
+ Execution mode: llap
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 6
+ Execution mode: llap
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: int)
+ Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+480
[3/3] hive git commit: HIVE-15278 : PTF+MergeJoin = NPE (Sergey
Shelukhin, reviewed by Gunther Hagleitner)
Posted by se...@apache.org.
HIVE-15278 : PTF+MergeJoin = NPE (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2feaa5dc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2feaa5dc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2feaa5dc
Branch: refs/heads/master
Commit: 2feaa5dc99febb8fd0367d8bfa4fe20d44930adc
Parents: 12130c3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 2 13:24:35 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 2 13:32:28 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java | 8 ++++++--
ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java | 8 ++++++--
2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2feaa5dc/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 002e49b..0b8eae8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -384,11 +384,15 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
this.nextKeyWritables[t] = null;
}
}
+
+ @Override
+ public void close(boolean abort) throws HiveException {
+ joinFinalLeftData(); // Do this WITHOUT checking for parents
+ super.close(abort);
+ }
@Override
public void closeOp(boolean abort) throws HiveException {
- joinFinalLeftData();
-
super.closeOp(abort);
// clean up
http://git-wip-us.apache.org/repos/asf/hive/blob/2feaa5dc/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 20f9d64..8b04cd4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -667,6 +667,9 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
// since it is called by its parents' main thread, so no
// more than 1 thread should call this close() function.
public void close(boolean abort) throws HiveException {
+ if (isLogDebugEnabled) {
+ LOG.debug("close called for operator " + this);
+ }
if (state == State.CLOSE) {
return;
@@ -683,12 +686,13 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
// set state as CLOSE as long as all parents are closed
// state == CLOSE doesn't mean all children are also in state CLOSE
state = State.CLOSE;
- if (isLogDebugEnabled) {
- LOG.debug(id + " finished. closing... ");
+ if (isLogInfoEnabled) {
+ LOG.info("Closing operator " + this);
}
abort |= abortOp.get();
+
// call the operator specific close routine
closeOp(abort);
[2/3] hive git commit: HIVE-15312 : reduce logging in certain places
(Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-15312 : reduce logging in certain places (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f9728ed
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f9728ed
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f9728ed
Branch: refs/heads/master
Commit: 2f9728ed3507247270df99bd9c178c3f928cc463
Parents: 98a25f2
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 2 11:25:16 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 2 13:32:27 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/hive/common/util/Ref.java | 32 +++++++++++++++
.../llap/shufflehandler/ShuffleHandler.java | 2 +-
.../tezplugins/LlapTaskSchedulerService.java | 6 +--
.../apache/hadoop/hive/ql/exec/Utilities.java | 18 +++++----
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 32 ++++++++++++---
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 41 ++++++++++++++++----
.../apache/hadoop/hive/ql/io/orc/OrcSplit.java | 6 ++-
7 files changed, 112 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/common/src/java/org/apache/hive/common/util/Ref.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/Ref.java b/common/src/java/org/apache/hive/common/util/Ref.java
new file mode 100644
index 0000000..0f666dd
--- /dev/null
+++ b/common/src/java/org/apache/hive/common/util/Ref.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+/** Reference to T. */
+public final class Ref<T> {
+ public T value;
+
+ public Ref(T value) {
+ this.value = value;
+ }
+
+ public static <T> Ref<T> from(T t) {
+ return new Ref<T>(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 9a3e221..02f7911 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -838,7 +838,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
+ connectionKeepAliveTimeOut);
- LOG.info("Content Length in shuffle : " + contentLength);
+ LOG.debug("Content Length in shuffle : " + contentLength);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 158772b..7838bef 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -1211,7 +1211,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
} else {
// No tasks qualify as preemptable
- LOG.info("No tasks qualify as killable to schedule tasks at priority {}", forPriority);
+ LOG.debug("No tasks qualify as killable to schedule tasks at priority {}", forPriority);
break;
}
}
@@ -1602,8 +1602,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
boolean canAcceptTask() {
boolean result = !hadCommFailure && !disabled
&&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
- if (LOG.isInfoEnabled()) {
- LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
serviceInstance.getWorkerIdentity() + "]: " +
"canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}",
result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled);
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 2b1d1ce..e8f50f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2974,6 +2974,8 @@ public final class Utilities {
// The alias may not have any path
Path path = null;
+ boolean hasLogged = false;
+ // Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
List<String> aliases = work.getPathToAliases().get(file);
if (aliases.contains(alias)) {
@@ -2986,13 +2988,15 @@ public final class Utilities {
}
pathsProcessed.add(path);
-
- LOG.info("Adding input file " + path);
- if (!skipDummy
- && isEmptyPath(job, path, ctx)) {
- path = createDummyFileForEmptyPartition(path, job, work,
- hiveScratchDir);
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding input file " + path);
+ } else if (!hasLogged) {
+ hasLogged = true;
+ LOG.info("Adding " + work.getPathToAliases().size()
+ + " inputs; the first input is " + path);
+ }
+ if (!skipDummy && isEmptyPath(job, path, ctx)) {
+ path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir);
}
pathsToAdd.add(path);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index f1eba5d..da00bb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -763,6 +764,15 @@ public class AcidUtils {
boolean useFileIds,
boolean ignoreEmptyFiles
) throws IOException {
+ return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles);
+ }
+
+ public static Directory getAcidState(Path directory,
+ Configuration conf,
+ ValidTxnList txnList,
+ Ref<Boolean> useFileIds,
+ boolean ignoreEmptyFiles
+ ) throws IOException {
FileSystem fs = directory.getFileSystem(conf);
// The following 'deltas' includes all kinds of delta files including insert & delete deltas.
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
@@ -770,12 +780,18 @@ public class AcidUtils {
List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
final List<FileStatus> obsolete = new ArrayList<FileStatus>();
List<HdfsFileStatusWithId> childrenWithId = null;
- if (useFileIds) {
+ Boolean val = useFileIds.value;
+ if (val == null || val) {
try {
childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
+ if (val == null) {
+ useFileIds.value = true;
+ }
} catch (Throwable t) {
LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
- useFileIds = false;
+ if (val == null && t instanceof UnsupportedOperationException) {
+ useFileIds.value = false;
+ }
}
}
TxnBase bestBase = new TxnBase();
@@ -995,15 +1011,21 @@ public class AcidUtils {
* @throws IOException
*/
private static void findOriginals(FileSystem fs, FileStatus stat,
- List<HdfsFileStatusWithId> original, boolean useFileIds) throws IOException {
+ List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds) throws IOException {
assert stat.isDir();
List<HdfsFileStatusWithId> childrenWithId = null;
- if (useFileIds) {
+ Boolean val = useFileIds.value;
+ if (val == null || val) {
try {
childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
+ if (val == null) {
+ useFileIds.value = true;
+ }
} catch (Throwable t) {
LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
- useFileIds = false;
+ if (val == null && t instanceof UnsupportedOperationException) {
+ useFileIds.value = false;
+ }
}
}
if (childrenWithId != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 3fe93ac..361901e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -106,6 +106,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.OrcProto;
import org.apache.orc.OrcUtils;
@@ -1015,11 +1016,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final Context context;
private final FileSystem fs;
private final Path dir;
- private final boolean useFileIds;
+ private final Ref<Boolean> useFileIds;
private final UserGroupInformation ugi;
FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
UserGroupInformation ugi) {
+ this(context, fs, dir, Ref.from(useFileIds), ugi);
+ }
+
+ FileGenerator(Context context, FileSystem fs, Path dir, Ref<Boolean> useFileIds,
+ UserGroupInformation ugi) {
this.context = context;
this.fs = fs;
this.dir = dir;
@@ -1082,16 +1088,23 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
} else {
// This is a normal insert delta, which only has insert events and hence all the files
// in this delta directory can be considered as a base.
- if (useFileIds) {
+ Boolean val = useFileIds.value;
+ if (val == null || val) {
try {
List<HdfsFileStatusWithId> insertDeltaFiles =
SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
}
+ if (val == null) {
+ useFileIds.value = true; // The call succeeded, so presumably the API is there.
+ }
continue; // move on to process to the next parsedDelta.
} catch (Throwable t) {
LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+ if (val == null && t instanceof UnsupportedOperationException) {
+ useFileIds.value = false;
+ }
}
}
// Fall back to regular API and create statuses without ID.
@@ -1112,12 +1125,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
private List<HdfsFileStatusWithId> findBaseFiles(
- Path base, boolean useFileIds) throws IOException {
- if (useFileIds) {
+ Path base, Ref<Boolean> useFileIds) throws IOException {
+ Boolean val = useFileIds.value;
+ if (val == null || val) {
try {
- return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter);
+ List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
+ fs, base, AcidUtils.hiddenFileFilter);
+ if (val == null) {
+ useFileIds.value = true; // The call succeeded, so presumably the API is there.
+ }
+ return result;
} catch (Throwable t) {
LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+ if (val == null && t instanceof UnsupportedOperationException) {
+ useFileIds.value = false;
+ }
}
}
@@ -1542,8 +1564,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (LOG.isInfoEnabled()) {
LOG.info("ORC pushdown predicate: " + context.sarg);
}
- boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
- boolean allowSyntheticFileIds = useFileIds && HiveConf.getBoolVar(
+ boolean useFileIdsConfig = HiveConf.getBoolVar(
+ conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
+ // Sharing this state assumes splits will succeed or fail to get it together (same FS).
+ // We also start with null and only set it to true on the first call, so we would only do
+ // the global-disable thing on the first failure w/the API error, not any random failure.
+ Ref<Boolean> useFileIds = Ref.from(useFileIdsConfig ? null : false);
+ boolean allowSyntheticFileIds = useFileIdsConfig && HiveConf.getBoolVar(
conf, ConfVars.HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS);
List<OrcSplit> splits = Lists.newArrayList();
List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 969e70e..d61b24b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -98,8 +98,10 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
int additional = bos.size() - required;
out.write(bos.toByteArray());
- LOG.info("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.", additional,
- required);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.",
+ additional, required);
+ }
}
private void writeAdditionalPayload(final DataOutputStream out) throws IOException {