You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2017/04/23 03:46:49 UTC

[5/5] hive git commit: HIVE-16421 Runtime filtering breaks user-level explain (Pengcheng Xiong, reviewed by Ashutosh Chauhan)

HIVE-16421 Runtime filtering breaks user-level explain (Pengcheng Xiong, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eaa439e3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eaa439e3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eaa439e3

Branch: refs/heads/master
Commit: eaa439e394514de0c38602d01a98d2e6237c0da8
Parents: 0b6a48d
Author: Pengcheng Xiong <px...@hortonworks.com>
Authored: Sat Apr 22 20:46:22 2017 -0700
Committer: Pengcheng Xiong <px...@hortonworks.com>
Committed: Sat Apr 22 20:46:22 2017 -0700

----------------------------------------------------------------------
 .../hive/common/jsonexplain/tez/Connection.java |    7 +-
 .../hadoop/hive/common/jsonexplain/tez/Op.java  |   58 +-
 .../hive/common/jsonexplain/tez/Vertex.java     |   45 +-
 .../test/resources/testconfiguration.properties |    1 +
 .../dynamic_semijoin_user_level.q               |  106 ++
 .../clientpositive/udf_round_2_auto_stats.q     |   16 +
 .../llap/dynamic_semijoin_user_level.q.out      | 1495 ++++++++++++++++
 .../clientpositive/llap/explainuser_2.q.out     | 1624 +++++-------------
 .../clientpositive/udf_round_2_auto_stats.q.out |   55 +
 9 files changed, 2206 insertions(+), 1201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
index d341cb1..5cd0e4c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.hive.common.jsonexplain.tez;
 
-public final class Connection {
+public final class Connection implements Comparable<Connection>{
   public final String type;
   public final Vertex from;
 
@@ -27,4 +27,9 @@ public final class Connection {
     this.type = type;
     this.from = from;
   }
+
+  @Override
+  public int compareTo(Connection o) {
+    return from.compareTo(o.from);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
index 718791c..96e75c0 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.common.jsonexplain.tez;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -120,19 +121,18 @@ public final class Op {
         for (String key : JSONObject.getNames(keys)) {
           // first search from the posToVertex
           if (posToVertex.containsKey(key)) {
-            Vertex vertex = posToVertex.get(key);
-            if (vertex.rootOps.size() == 1) {
-              posToOpId.put(key, vertex.rootOps.get(0).operatorId);
-            } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) {
-              posToOpId.put(key, vertex.name);
+            Vertex v = posToVertex.get(key);
+            if (v.rootOps.size() == 1) {
+              posToOpId.put(key, v.rootOps.get(0).operatorId);
+            } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+              posToOpId.put(key, v.name);
             } else {
-              Op singleRSOp = vertex.getSingleRSOp();
-              if (singleRSOp != null) {
-                posToOpId.put(key, singleRSOp.operatorId);
+              Op joinRSOp = v.getJoinRSOp(vertex);
+              if (joinRSOp != null) {
+                posToOpId.put(key, joinRSOp.operatorId);
               } else {
                 throw new Exception(
-                    "There are none or more than one root operators in a single vertex "
-                        + vertex.name
+                    "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
                         + " when hive explain user is trying to identify the operator id.");
               }
             }
@@ -143,20 +143,19 @@ public final class Op {
           }
           // then assume it is from its own vertex
           else if (parentVertexes.size() == 1) {
-            Vertex vertex = parentVertexes.iterator().next();
+            Vertex v = parentVertexes.iterator().next();
             parentVertexes.clear();
-            if (vertex.rootOps.size() == 1) {
-              posToOpId.put(key, vertex.rootOps.get(0).operatorId);
-            } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) {
-              posToOpId.put(key, vertex.name);
+            if (v.rootOps.size() == 1) {
+              posToOpId.put(key, v.rootOps.get(0).operatorId);
+            } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+              posToOpId.put(key, v.name);
             } else {
-              Op singleRSOp = vertex.getSingleRSOp();
-              if (singleRSOp != null) {
-                posToOpId.put(key, singleRSOp.operatorId);
+              Op joinRSOp = v.getJoinRSOp(vertex);
+              if (joinRSOp != null) {
+                posToOpId.put(key, joinRSOp.operatorId);
               } else {
                 throw new Exception(
-                    "There are none or more than one root operators in a single vertex "
-                        + vertex.name
+                    "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
                         + " when hive explain user is trying to identify the operator id.");
               }
             }
@@ -207,12 +206,12 @@ public final class Op {
               } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
                 posToOpId.put(entry.getKey(), v.name);
               } else {
-                Op singleRSOp = v.getSingleRSOp();
-                if (singleRSOp != null) {
-                  posToOpId.put(entry.getKey(), singleRSOp.operatorId);
+                Op joinRSOp = v.getJoinRSOp(vertex);
+                if (joinRSOp != null) {
+                  posToOpId.put(entry.getKey(), joinRSOp.operatorId);
                 } else {
                   throw new Exception(
-                      "There are none or more than one root operators in a single vertex " + v.name
+                      "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
                           + " when hive explain user is trying to identify the operator id.");
                 }
               }
@@ -336,8 +335,9 @@ public final class Op {
     }
     // print inline vertex
     if (parser.inlineMap.containsKey(this)) {
-      for (int index = 0; index < parser.inlineMap.get(this).size(); index++) {
-        Connection connection = parser.inlineMap.get(this).get(index);
+      List<Connection> connections = parser.inlineMap.get(this);
+      Collections.sort(connections);
+      for (Connection connection : connections) {
         connection.from.print(printer, indentFlag, connection.type, this.vertex);
       }
     }
@@ -347,9 +347,9 @@ public final class Op {
     }
     // print next vertex
     else {
-      for (int index = 0; index < noninlined.size(); index++) {
-        Vertex v = noninlined.get(index).from;
-        v.print(printer, indentFlag, noninlined.get(index).type, this.vertex);
+      Collections.sort(noninlined);
+      for (Connection connection : noninlined) {
+        connection.from.print(printer, indentFlag, connection.type, this.vertex);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
index 3d559bd..13ecac0 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
@@ -20,15 +20,12 @@ package org.apache.hadoop.hive.common.jsonexplain.tez;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hive.common.jsonexplain.tez.Op.OpType;
-import org.apache.hadoop.util.hash.Hash;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.json.JSONArray;
@@ -53,8 +50,8 @@ public final class Vertex implements Comparable<Vertex>{
   // we create a dummy vertex for a mergejoin branch for a self join if this
   // vertex is a mergejoin
   public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>();
-  // whether this vertex has multiple reduce operators
-  public boolean hasMultiReduceOp = false;
+  // this vertex has multiple reduce operators
+  public int numReduceOp = 0;
   // execution mode
   public String executionMode = "";
   // tagToInput for reduce work
@@ -217,7 +214,7 @@ public final class Vertex implements Comparable<Vertex>{
   public void print(Printer printer, int indentFlag, String type, Vertex callingVertex)
       throws JSONException, Exception {
     // print vertexname
-    if (parser.printSet.contains(this) && !hasMultiReduceOp) {
+    if (parser.printSet.contains(this) && numReduceOp <= 1) {
       if (type != null) {
         printer.println(TezJsonParser.prefixString(indentFlag, "<-")
             + " Please refer to the previous " + this.name + " [" + type + "]");
@@ -235,7 +232,7 @@ public final class Vertex implements Comparable<Vertex>{
       printer.println(TezJsonParser.prefixString(indentFlag) + this.name + this.executionMode);
     }
     // print operators
-    if (hasMultiReduceOp && !(callingVertex.vertexType == VertexType.UNION)) {
+    if (numReduceOp > 1 && !(callingVertex.vertexType == VertexType.UNION)) {
       // find the right op
       Op choose = null;
       for (Op op : this.rootOps) {
@@ -273,16 +270,15 @@ public final class Vertex implements Comparable<Vertex>{
    */
   public void checkMultiReduceOperator() {
     // check if it is a reduce vertex and its children is more than 1;
-    if (!this.name.contains("Reduce") || this.rootOps.size() < 2) {
+    if (this.rootOps.size() < 2) {
       return;
     }
     // check if all the child ops are reduce output operators
     for (Op op : this.rootOps) {
-      if (op.type != OpType.RS) {
-        return;
+      if (op.type == OpType.RS) {
+        numReduceOp++;
       }
     }
-    this.hasMultiReduceOp = true;
   }
 
   public void setType(String type) {
@@ -304,28 +300,35 @@ public final class Vertex implements Comparable<Vertex>{
     }
   }
 
-  //The following code should be gone after HIVE-11075 using topological order
+  // The following code should be gone after HIVE-11075 using topological order
   @Override
   public int compareTo(Vertex o) {
-    return this.name.compareTo(o.name);
+    // we print the vertex that has more rs before the vertex that has fewer rs.
+    if (numReduceOp != o.numReduceOp) {
+      return -(numReduceOp - o.numReduceOp);
+    } else {
+      return this.name.compareTo(o.name);
+    }
   }
 
-  public Op getSingleRSOp() {
+  public Op getJoinRSOp(Vertex joinVertex) {
     if (rootOps.size() == 0) {
       return null;
+    } else if (rootOps.size() == 1) {
+      if (rootOps.get(0).type == OpType.RS) {
+        return rootOps.get(0);
+      } else {
+        return null;
+      }
     } else {
-      Op ret = null;
       for (Op op : rootOps) {
         if (op.type == OpType.RS) {
-          if (ret == null) {
-            ret = op;
-          } else {
-            // find more than one RS Op
-            return null;
+          if (op.outputVertexName.equals(joinVertex.name)) {
+            return op;
           }
         }
       }
-      return ret;
+      return null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 116d0eb..d684ba8 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -407,6 +407,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
 minillap.query.files=acid_bucket_pruning.q,\
   bucket5.q,\
   bucket6.q,\
+  dynamic_semijoin_user_level.q,\
   except_distinct.q,\
   explainuser_2.q,\
   empty_dir_in_table.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
new file mode 100644
index 0000000..88ab46e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
@@ -0,0 +1,106 @@
+set hive.explain.user=true;
+set hive.compute.query.using.stats=false;
+set hive.mapred.mode=nonstrict;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.tez.dynamic.semijoin.reduction=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.stats.autogather=true;
+set hive.tez.bigtable.minsize.semijoin.reduction=1;
+set hive.tez.min.bloom.filter.entries=1;
+set hive.stats.fetch.column.stats=true;
+
+-- Create Tables
+create table alltypesorc_int ( cint int, cstring string ) stored as ORC;
+create table srcpart_date (key string, value string) partitioned by (ds string ) stored as ORC;
+CREATE TABLE srcpart_small(key1 STRING, value1 STRING) partitioned by (ds string) STORED as ORC;
+
+-- Add Partitions
+alter table srcpart_date add partition (ds = "2008-04-08");
+alter table srcpart_date add partition (ds = "2008-04-09");
+
+alter table srcpart_small add partition (ds = "2008-04-08");
+alter table srcpart_small add partition (ds = "2008-04-09");
+
+-- Load
+insert overwrite table alltypesorc_int select cint, cstring1 from alltypesorc;
+insert overwrite table srcpart_date partition (ds = "2008-04-08" ) select key, value from srcpart where ds = "2008-04-08";
+insert overwrite table srcpart_date partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09";
+insert overwrite table srcpart_small partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09" limit 20;
+
+set hive.tez.dynamic.semijoin.reduction=false;
+
+analyze table alltypesorc_int compute statistics for columns;
+analyze table srcpart_date compute statistics for columns;
+analyze table srcpart_small compute statistics for columns;
+
+-- single column, single key
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=true;
+
+-- Mix dynamic partition pruning(DPP) and min/max bloom filter optimizations. Should pick the DPP.
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.ds);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.ds);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+--multiple sources, single key
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- single source, multiple keys
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- multiple sources, different  keys
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+
+-- Explain extended to verify fast start for Reducer in semijoin branch
+EXPLAIN extended select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- With Mapjoins.
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=100000000000;
+
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- multiple sources, different  keys
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+--set hive.tez.dynamic.semijoin.reduction=false;
+
+-- With unions
+explain select * from alltypesorc_int join
+                                      (select srcpart_date.key as key from srcpart_date
+                                       union all
+                                       select srcpart_small.key1 as key from srcpart_small) unionsrc on (alltypesorc_int.cstring = unionsrc.key);
+
+
+drop table srcpart_date;
+drop table srcpart_small;
+drop table alltypesorc_int;

http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q b/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q
new file mode 100644
index 0000000..2532f81
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q
@@ -0,0 +1,16 @@
+set hive.fetch.task.conversion=more;
+set hive.stats.column.autogather=true;
+
+-- test for NaN (not-a-number)
+create table tstTbl1(n double);
+
+insert overwrite table tstTbl1
+select 'NaN' from src tablesample (1 rows);
+
+select * from tstTbl1;
+
+select round(n, 1) from tstTbl1;
+select round(n) from tstTbl1;
+
+-- test for Infinity
+select round(1/0), round(1/0, 2), round(1.0/0.0), round(1.0/0.0, 2) from src tablesample (1 rows);