You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/02 21:32:54 UTC

[1/3] hive git commit: HIVE-15323 : allow the user to turn off reduce-side SMB join (Sergey Shelukhin, reviewed by Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/master 98a25f2d8 -> 2feaa5dc9


HIVE-15323 : allow the user to turn off reduce-side SMB join (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/12130c3e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/12130c3e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/12130c3e

Branch: refs/heads/master
Commit: 12130c3ee71c2499d8224624a0e4cc7109727579
Parents: 2f9728e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 2 13:22:20 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 2 13:32:27 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |  21 ++-
 .../annotation/OpTraitsRulesProcFactory.java    |  38 ++++-
 .../optimizer/spark/SparkMapJoinOptimizer.java  |   2 +-
 .../apache/hadoop/hive/ql/plan/OpTraits.java    |  15 +-
 ql/src/test/queries/clientpositive/tez_smb_1.q  |  20 +++
 .../results/clientpositive/llap/tez_smb_1.q.out | 161 +++++++++++++++++++
 7 files changed, 241 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b809562..9064e49 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1506,6 +1506,8 @@ public class HiveConf extends Configuration {
 
     HIVE_AUTO_SORTMERGE_JOIN("hive.auto.convert.sortmerge.join", false,
         "Will the join be automatically converted to a sort-merge join, if the joined tables pass the criteria for sort-merge join."),
+    HIVE_AUTO_SORTMERGE_JOIN_REDUCE("hive.auto.convert.sortmerge.join.reduce.side", true,
+        "Whether hive.auto.convert.sortmerge.join (if enabled) should be applied to reduce side."),
     HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR(
         "hive.auto.convert.sortmerge.join.bigtable.selection.policy",
         "org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ",

http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 2b93e01..7441f1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -29,6 +29,7 @@ import java.util.Stack;
 
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
@@ -147,7 +148,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos, true);
     // map join operator by default has no bucket cols and num of reduce sinks
     // reduced by 1
-    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+    mapJoinOp.setOpTraits(new OpTraits(null, -1, null, joinOp.getOpTraits().getNumReduceSinks()));
     mapJoinOp.setStatistics(joinOp.getStatistics());
     // propagate this change till the next RS
     for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
@@ -162,7 +163,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
     // we cannot convert to bucket map join, we cannot convert to
     // map join either based on the size. Check if we can convert to SMB join.
-    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+    if ((HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false)
+      || ((!HiveConf.getBoolVar(context.conf, ConfVars.HIVE_AUTO_SORTMERGE_JOIN_REDUCE))
+          && joinOp.getOpTraits().getNumReduceSinks() >= 2)) {
       fallbackToReduceSideJoin(joinOp, context);
       return null;
     }
@@ -236,9 +239,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         (CommonMergeJoinOperator) OperatorFactory.get(joinOp.getCompilationOpContext(),
             new CommonMergeJoinDesc(numBuckets, mapJoinConversionPos, mapJoinDesc),
             joinOp.getSchema());
-    OpTraits opTraits =
-        new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
-            .getSortCols());
+    int numReduceSinks = joinOp.getOpTraits().getNumReduceSinks();
+    OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets,
+      joinOp.getOpTraits().getSortCols(), numReduceSinks);
     mergeJoinOp.setOpTraits(opTraits);
     mergeJoinOp.setStatistics(joinOp.getStatistics());
 
@@ -304,7 +307,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     if (currentOp instanceof ReduceSinkOperator) {
       return;
     }
-    currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(), opTraits.getNumBuckets(), opTraits.getSortCols()));
+    currentOp.setOpTraits(new OpTraits(opTraits.getBucketColNames(),
+      opTraits.getNumBuckets(), opTraits.getSortCols(), opTraits.getNumReduceSinks()));
     for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
         break;
@@ -331,7 +335,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
 
     // we can set the traits for this join operator
     OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
-            tezBucketJoinProcCtx.getNumBuckets(), null);
+        tezBucketJoinProcCtx.getNumBuckets(), null, joinOp.getOpTraits().getNumReduceSinks());
     mapJoinOp.setOpTraits(opTraits);
     mapJoinOp.setStatistics(joinOp.getStatistics());
     setNumberOfBucketsOnChildren(mapJoinOp);
@@ -851,7 +855,8 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         OpTraits opTraits = new OpTraits(
             joinOp.getOpTraits().getBucketColNames(),
             numReducers,
-            null);
+            null,
+            joinOp.getOpTraits().getNumReduceSinks());
         mapJoinOp.setOpTraits(opTraits);
         mapJoinOp.setStatistics(joinOp.getStatistics());
         // propagate this change till the next RS

http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
index 1e89016..875ee9d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
@@ -106,11 +106,13 @@ public class OpTraitsRulesProcFactory {
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
       listBucketCols.add(bucketCols);
       int numBuckets = -1;
+      int numReduceSinks = 1;
       OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getTraits();
       if (parentOpTraits != null) {
         numBuckets = parentOpTraits.getNumBuckets();
+        numReduceSinks += parentOpTraits.getNumReduceSinks();
       }
-      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols, numReduceSinks);
       rs.setOpTraits(opTraits);
       return null;
     }
@@ -135,8 +137,8 @@ public class OpTraitsRulesProcFactory {
         // construct a mapping of (Partition->bucket file names) and (Partition -> bucket number)
         if (!partitions.isEmpty()) {
           for (Partition p : partitions) {
-            List<String> fileNames =
-                AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(),
+            List<String> fileNames = 
+                AbstractBucketJoinProc.getBucketFilePathsOfPartition(p.getDataLocation(), 
                     pGraphContext);
             // The number of files for the table should be same as number of
             // buckets.
@@ -187,7 +189,7 @@ public class OpTraitsRulesProcFactory {
         sortedColsList.add(sortCols);
       }
       // num reduce sinks hardcoded to 0 because TS has no parents
-      OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
+      OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList, 0);
       ts.setOpTraits(opTraits);
       return null;
     }
@@ -212,8 +214,13 @@ public class OpTraitsRulesProcFactory {
       }
 
       List<List<String>> listBucketCols = new ArrayList<List<String>>();
+      int numReduceSinks = 0;
+      OpTraits parentOpTraits = gbyOp.getParentOperators().get(0).getOpTraits();
+      if (parentOpTraits != null) {
+        numReduceSinks = parentOpTraits.getNumReduceSinks();
+      }
       listBucketCols.add(gbyKeys);
-      OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
+      OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols, numReduceSinks);
       gbyOp.setOpTraits(opTraits);
       return null;
     }
@@ -266,11 +273,13 @@ public class OpTraitsRulesProcFactory {
       }
 
       int numBuckets = -1;
+      int numReduceSinks = 0;
       OpTraits parentOpTraits = selOp.getParentOperators().get(0).getOpTraits();
       if (parentOpTraits != null) {
         numBuckets = parentOpTraits.getNumBuckets();
+        numReduceSinks = parentOpTraits.getNumReduceSinks();
       }
-      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
+      OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols, numReduceSinks);
       selOp.setOpTraits(opTraits);
       return null;
     }
@@ -299,10 +308,13 @@ public class OpTraitsRulesProcFactory {
         OpTraits parentOpTraits = rsOp.getOpTraits();
         bucketColsList.add(getOutputColNames(joinOp, parentOpTraits.getBucketColNames(), pos));
         sortColsList.add(getOutputColNames(joinOp, parentOpTraits.getSortCols(), pos));
+        if (parentOpTraits.getNumReduceSinks() > numReduceSinks) {
+          numReduceSinks = parentOpTraits.getNumReduceSinks();
+        }
         pos++;
       }
 
-      joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
+      joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList, numReduceSinks));
       return null;
     }
 
@@ -355,7 +367,17 @@ public class OpTraitsRulesProcFactory {
         Object... nodeOutputs) throws SemanticException {
       @SuppressWarnings("unchecked")
       Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd;
-      OpTraits opTraits = new OpTraits(null, -1, null);
+
+      int numReduceSinks = 0;
+      for (Operator<?> parentOp : operator.getParentOperators()) {
+        if (parentOp.getOpTraits() == null) {
+          continue;
+        }
+        if (parentOp.getOpTraits().getNumReduceSinks() > numReduceSinks) {
+          numReduceSinks = parentOp.getOpTraits().getNumReduceSinks();
+        }
+      }
+      OpTraits opTraits = new OpTraits(null, -1, null, numReduceSinks);
       operator.setOpTraits(opTraits);
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 005fad2..7faff88 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -113,7 +113,7 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
     }
 
     // we can set the traits for this join operator
-    OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null);
+    OpTraits opTraits = new OpTraits(bucketColNames, numBuckets, null, joinOp.getOpTraits().getNumReduceSinks());
     mapJoinOp.setOpTraits(opTraits);
     mapJoinOp.setStatistics(joinOp.getStatistics());
     setNumberOfBucketsOnChildren(mapJoinOp);

http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
index 1c76586..ff8cfbf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
@@ -25,11 +25,14 @@ public class OpTraits {
   List<List<String>> bucketColNames;
   List<List<String>> sortColNames;
   int numBuckets;
+  int numReduceSinks;
 
-  public OpTraits(List<List<String>> bucketColNames, int numBuckets, List<List<String>> sortColNames) {
+  public OpTraits(List<List<String>> bucketColNames, int numBuckets,
+      List<List<String>> sortColNames, int numReduceSinks) {
     this.bucketColNames = bucketColNames;
     this.numBuckets = numBuckets;
     this.sortColNames = sortColNames;
+    this.numReduceSinks = numReduceSinks;
   }
 
   public List<List<String>> getBucketColNames() {
@@ -55,6 +58,16 @@ public class OpTraits {
   public List<List<String>> getSortCols() {
     return sortColNames;
   }
+
+
+  public void setNumReduceSinks(int numReduceSinks) {
+    this.numReduceSinks = numReduceSinks;
+  }
+
+  public int getNumReduceSinks() {
+    return this.numReduceSinks;
+  }
+
   
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/test/queries/clientpositive/tez_smb_1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_smb_1.q b/ql/src/test/queries/clientpositive/tez_smb_1.q
index 6862a9a..4e8d1c0 100644
--- a/ql/src/test/queries/clientpositive/tez_smb_1.q
+++ b/ql/src/test/queries/clientpositive/tez_smb_1.q
@@ -87,3 +87,23 @@ join
 (select rt2.id from
 (select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
 where vt1.id=vt2.id;
+
+set hive.auto.convert.sortmerge.join.reduce.side=false;
+
+explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id;
+

http://git-wip-us.apache.org/repos/asf/hive/blob/12130c3e/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out b/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
index 4782c18..94e519e 100644
--- a/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_smb_1.q.out
@@ -622,3 +622,164 @@ POSTHOOK: Input: default@tab_part
 POSTHOOK: Input: default@tab_part@ds=2008-04-08
 #### A masked pattern was here ####
 480
+PREHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
+        Reducer 6 <- Map 5 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: t1
+                  Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: string)
+                        sort order: ++
+                        Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  alias: t2
+                  Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int), _col1 (type: string)
+                        sort order: ++
+                        Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int)
+                outputColumnNames: _col0
+                Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: int)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: int)
+                  Statistics: Num rows: 242 Data size: 4502 Basic stats: COMPLETE Column stats: NONE
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: int)
+                  1 _col0 (type: int)
+                Statistics: Num rows: 550 Data size: 10243 Basic stats: COMPLETE Column stats: NONE
+                Group By Operator
+                  aggregations: count()
+                  mode: hash
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  Reduce Output Operator
+                    sort order: 
+                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: bigint)
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 6 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int)
+                outputColumnNames: _col0
+                Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: int)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: int)
+                  Statistics: Num rows: 500 Data size: 9312 Basic stats: COMPLETE Column stats: NONE
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*) from
+(select rt1.id from
+(select t1.key as id, t1.value as od from tab t1 order by id, od) rt1) vt1
+join
+(select rt2.id from
+(select t2.key as id, t2.value as od from tab_part t2 order by id, od) rt2) vt2
+where vt1.id=vt2.id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+480


[3/3] hive git commit: HIVE-15278 : PTF+MergeJoin = NPE (Sergey Shelukhin, reviewed by Gunther Hagleitner)

Posted by se...@apache.org.
HIVE-15278 : PTF+MergeJoin = NPE (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2feaa5dc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2feaa5dc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2feaa5dc

Branch: refs/heads/master
Commit: 2feaa5dc99febb8fd0367d8bfa4fe20d44930adc
Parents: 12130c3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 2 13:24:35 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 2 13:32:28 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java  | 8 ++++++--
 ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java     | 8 ++++++--
 2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2feaa5dc/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
index 002e49b..0b8eae8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java
@@ -384,11 +384,15 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
       this.nextKeyWritables[t] = null;
     }
   }
+  
+  @Override
+  public void close(boolean abort) throws HiveException {
+    joinFinalLeftData(); // Do this WITHOUT checking for parents
+    super.close(abort);
+  }
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
-    joinFinalLeftData();
-
     super.closeOp(abort);
 
     // clean up

http://git-wip-us.apache.org/repos/asf/hive/blob/2feaa5dc/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 20f9d64..8b04cd4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@ -667,6 +667,9 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
   // since it is called by its parents' main thread, so no
   // more than 1 thread should call this close() function.
   public void close(boolean abort) throws HiveException {
+    if (isLogDebugEnabled) {
+      LOG.debug("close called for operator " + this);
+    }
 
     if (state == State.CLOSE) {
       return;
@@ -683,12 +686,13 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C
     // set state as CLOSE as long as all parents are closed
     // state == CLOSE doesn't mean all children are also in state CLOSE
     state = State.CLOSE;
-    if (isLogDebugEnabled) {
-      LOG.debug(id + " finished. closing... ");
+    if (isLogInfoEnabled) {
+      LOG.info("Closing operator " + this);
     }
 
     abort |= abortOp.get();
 
+
     // call the operator specific close routine
     closeOp(abort);
 


[2/3] hive git commit: HIVE-15312 : reduce logging in certain places (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-15312 : reduce logging in certain places (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f9728ed
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f9728ed
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f9728ed

Branch: refs/heads/master
Commit: 2f9728ed3507247270df99bd9c178c3f928cc463
Parents: 98a25f2
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 2 11:25:16 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 2 13:32:27 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/hive/common/util/Ref.java   | 32 +++++++++++++++
 .../llap/shufflehandler/ShuffleHandler.java     |  2 +-
 .../tezplugins/LlapTaskSchedulerService.java    |  6 +--
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 18 +++++----
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 32 ++++++++++++---
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 41 ++++++++++++++++----
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |  6 ++-
 7 files changed, 112 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/common/src/java/org/apache/hive/common/util/Ref.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/Ref.java b/common/src/java/org/apache/hive/common/util/Ref.java
new file mode 100644
index 0000000..0f666dd
--- /dev/null
+++ b/common/src/java/org/apache/hive/common/util/Ref.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+/** Reference to T. */
+public final class Ref<T> {
+  public T value;
+
+  public Ref(T value) {
+    this.value = value;
+  }
+
+  public static <T> Ref<T> from(T t) {
+    return new Ref<T>(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index 9a3e221..02f7911 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -838,7 +838,7 @@ public class ShuffleHandler implements AttemptRegistrationListener {
         response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
         response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
             + connectionKeepAliveTimeOut);
-        LOG.info("Content Length in shuffle : " + contentLength);
+        LOG.debug("Content Length in shuffle : " + contentLength);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 158772b..7838bef 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -1211,7 +1211,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           }
         } else {
           // No tasks qualify as preemptable
-          LOG.info("No tasks qualify as killable to schedule tasks at priority {}", forPriority);
+          LOG.debug("No tasks qualify as killable to schedule tasks at priority {}", forPriority);
           break;
         }
       }
@@ -1602,8 +1602,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     boolean canAcceptTask() {
       boolean result = !hadCommFailure && !disabled
           &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
                 serviceInstance.getWorkerIdentity() + "]: " +
                 "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}",
             result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled);

http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 2b1d1ce..e8f50f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -2974,6 +2974,8 @@ public final class Utilities {
 
       // The alias may not have any path
       Path path = null;
+      boolean hasLogged = false;
+      // Note: this copies the list because createDummyFileForEmptyPartition may modify the map.
       for (Path file : new LinkedList<Path>(work.getPathToAliases().keySet())) {
         List<String> aliases = work.getPathToAliases().get(file);
         if (aliases.contains(alias)) {
@@ -2986,13 +2988,15 @@ public final class Utilities {
           }
 
           pathsProcessed.add(path);
-
-          LOG.info("Adding input file " + path);
-          if (!skipDummy
-              && isEmptyPath(job, path, ctx)) {
-            path = createDummyFileForEmptyPartition(path, job, work,
-                 hiveScratchDir);
-
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Adding input file " + path);
+          } else if (!hasLogged) {
+            hasLogged = true;
+            LOG.info("Adding " + work.getPathToAliases().size()
+                + " inputs; the first input is " + path);
+          }
+          if (!skipDummy && isEmptyPath(job, path, ctx)) {
+            path = createDummyFileForEmptyPartition(path, job, work, hiveScratchDir);
           }
           pathsToAdd.add(path);
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index f1eba5d..da00bb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -763,6 +764,15 @@ public class AcidUtils {
                                        boolean useFileIds,
                                        boolean ignoreEmptyFiles
                                        ) throws IOException {
+    return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles);
+  }
+
+  public static Directory getAcidState(Path directory,
+                                       Configuration conf,
+                                       ValidTxnList txnList,
+                                       Ref<Boolean> useFileIds,
+                                       boolean ignoreEmptyFiles
+                                       ) throws IOException {
     FileSystem fs = directory.getFileSystem(conf);
     // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
@@ -770,12 +780,18 @@ public class AcidUtils {
     List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
     final List<FileStatus> obsolete = new ArrayList<FileStatus>();
     List<HdfsFileStatusWithId> childrenWithId = null;
-    if (useFileIds) {
+    Boolean val = useFileIds.value;
+    if (val == null || val) {
       try {
         childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
+        if (val == null) {
+          useFileIds.value = true;
+        }
       } catch (Throwable t) {
         LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
-        useFileIds = false;
+        if (val == null && t instanceof UnsupportedOperationException) {
+          useFileIds.value = false;
+        }
       }
     }
     TxnBase bestBase = new TxnBase();
@@ -995,15 +1011,21 @@ public class AcidUtils {
    * @throws IOException
    */
   private static void findOriginals(FileSystem fs, FileStatus stat,
-      List<HdfsFileStatusWithId> original, boolean useFileIds) throws IOException {
+      List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds) throws IOException {
     assert stat.isDir();
     List<HdfsFileStatusWithId> childrenWithId = null;
-    if (useFileIds) {
+    Boolean val = useFileIds.value;
+    if (val == null || val) {
       try {
         childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
+        if (val == null) {
+          useFileIds.value = true;
+        }
       } catch (Throwable t) {
         LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
-        useFileIds = false;
+        if (val == null && t instanceof UnsupportedOperationException) {
+          useFileIds.value = false;
+        }
       }
     }
     if (childrenWithId != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 3fe93ac..361901e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -106,6 +106,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.OrcProto;
 import org.apache.orc.OrcUtils;
@@ -1015,11 +1016,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final Context context;
     private final FileSystem fs;
     private final Path dir;
-    private final boolean useFileIds;
+    private final Ref<Boolean> useFileIds;
     private final UserGroupInformation ugi;
 
     FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
         UserGroupInformation ugi) {
+      this(context, fs, dir, Ref.from(useFileIds), ugi);
+    }
+
+    FileGenerator(Context context, FileSystem fs, Path dir, Ref<Boolean> useFileIds,
+        UserGroupInformation ugi) {
       this.context = context;
       this.fs = fs;
       this.dir = dir;
@@ -1082,16 +1088,23 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           } else {
             // This is a normal insert delta, which only has insert events and hence all the files
             // in this delta directory can be considered as a base.
-            if (useFileIds) {
+            Boolean val = useFileIds.value;
+            if (val == null || val) {
               try {
                 List<HdfsFileStatusWithId> insertDeltaFiles =
                     SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter);
                 for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
                   baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA));
                 }
+                if (val == null) {
+                  useFileIds.value = true; // The call succeeded, so presumably the API is there.
+                }
                 continue; // move on to process to the next parsedDelta.
               } catch (Throwable t) {
                 LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+                if (val == null && t instanceof UnsupportedOperationException) {
+                  useFileIds.value = false;
+                }
               }
             }
             // Fall back to regular API and create statuses without ID.
@@ -1112,12 +1125,21 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
 
     private List<HdfsFileStatusWithId> findBaseFiles(
-        Path base, boolean useFileIds) throws IOException {
-      if (useFileIds) {
+        Path base, Ref<Boolean> useFileIds) throws IOException {
+      Boolean val = useFileIds.value;
+      if (val == null || val) {
         try {
-          return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter);
+          List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
+              fs, base, AcidUtils.hiddenFileFilter);
+          if (val == null) {
+            useFileIds.value = true; // The call succeeded, so presumably the API is there.
+          }
+          return result;
         } catch (Throwable t) {
           LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+          if (val == null && t instanceof UnsupportedOperationException) {
+            useFileIds.value = false;
+          }
         }
       }
 
@@ -1542,8 +1564,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     if (LOG.isInfoEnabled()) {
       LOG.info("ORC pushdown predicate: " + context.sarg);
     }
-    boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
-    boolean allowSyntheticFileIds = useFileIds && HiveConf.getBoolVar(
+    boolean useFileIdsConfig = HiveConf.getBoolVar(
+        conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
+    // Sharing this state assumes splits will succeed or fail to get it together (same FS).
+    // We also start with null and only set it to true on the first call, so we would only do
+    // the global-disable thing on the first failure w/the API error, not any random failure.
+    Ref<Boolean> useFileIds = Ref.from(useFileIdsConfig ? null : false);
+    boolean allowSyntheticFileIds = useFileIdsConfig && HiveConf.getBoolVar(
         conf, ConfVars.HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS);
     List<OrcSplit> splits = Lists.newArrayList();
     List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/hive/blob/2f9728ed/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 969e70e..d61b24b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -98,8 +98,10 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     int additional = bos.size() - required;
 
     out.write(bos.toByteArray());
-    LOG.info("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.", additional,
-        required);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.",
+          additional, required);
+    }
   }
 
   private void writeAdditionalPayload(final DataOutputStream out) throws IOException {