You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2017/04/23 03:46:49 UTC
[5/5] hive git commit: HIVE-16421 Runtime filtering breaks user-level
explain (Pengcheng Xiong, reviewed by Ashutosh Chauhan)
HIVE-16421 Runtime filtering breaks user-level explain (Pengcheng Xiong, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eaa439e3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eaa439e3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eaa439e3
Branch: refs/heads/master
Commit: eaa439e394514de0c38602d01a98d2e6237c0da8
Parents: 0b6a48d
Author: Pengcheng Xiong <px...@hortonworks.com>
Authored: Sat Apr 22 20:46:22 2017 -0700
Committer: Pengcheng Xiong <px...@hortonworks.com>
Committed: Sat Apr 22 20:46:22 2017 -0700
----------------------------------------------------------------------
.../hive/common/jsonexplain/tez/Connection.java | 7 +-
.../hadoop/hive/common/jsonexplain/tez/Op.java | 58 +-
.../hive/common/jsonexplain/tez/Vertex.java | 45 +-
.../test/resources/testconfiguration.properties | 1 +
.../dynamic_semijoin_user_level.q | 106 ++
.../clientpositive/udf_round_2_auto_stats.q | 16 +
.../llap/dynamic_semijoin_user_level.q.out | 1495 ++++++++++++++++
.../clientpositive/llap/explainuser_2.q.out | 1624 +++++-------------
.../clientpositive/udf_round_2_auto_stats.q.out | 55 +
9 files changed, 2206 insertions(+), 1201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
index d341cb1..5cd0e4c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Connection.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.common.jsonexplain.tez;
-public final class Connection {
+public final class Connection implements Comparable<Connection>{
public final String type;
public final Vertex from;
@@ -27,4 +27,9 @@ public final class Connection {
this.type = type;
this.from = from;
}
+
+ @Override
+ public int compareTo(Connection o) {
+ return from.compareTo(o.from);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
index 718791c..96e75c0 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Op.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.common.jsonexplain.tez;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -120,19 +121,18 @@ public final class Op {
for (String key : JSONObject.getNames(keys)) {
// first search from the posToVertex
if (posToVertex.containsKey(key)) {
- Vertex vertex = posToVertex.get(key);
- if (vertex.rootOps.size() == 1) {
- posToOpId.put(key, vertex.rootOps.get(0).operatorId);
- } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) {
- posToOpId.put(key, vertex.name);
+ Vertex v = posToVertex.get(key);
+ if (v.rootOps.size() == 1) {
+ posToOpId.put(key, v.rootOps.get(0).operatorId);
+ } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+ posToOpId.put(key, v.name);
} else {
- Op singleRSOp = vertex.getSingleRSOp();
- if (singleRSOp != null) {
- posToOpId.put(key, singleRSOp.operatorId);
+ Op joinRSOp = v.getJoinRSOp(vertex);
+ if (joinRSOp != null) {
+ posToOpId.put(key, joinRSOp.operatorId);
} else {
throw new Exception(
- "There are none or more than one root operators in a single vertex "
- + vertex.name
+ "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+ " when hive explain user is trying to identify the operator id.");
}
}
@@ -143,20 +143,19 @@ public final class Op {
}
// then assume it is from its own vertex
else if (parentVertexes.size() == 1) {
- Vertex vertex = parentVertexes.iterator().next();
+ Vertex v = parentVertexes.iterator().next();
parentVertexes.clear();
- if (vertex.rootOps.size() == 1) {
- posToOpId.put(key, vertex.rootOps.get(0).operatorId);
- } else if ((vertex.rootOps.size() == 0 && vertex.vertexType == VertexType.UNION)) {
- posToOpId.put(key, vertex.name);
+ if (v.rootOps.size() == 1) {
+ posToOpId.put(key, v.rootOps.get(0).operatorId);
+ } else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
+ posToOpId.put(key, v.name);
} else {
- Op singleRSOp = vertex.getSingleRSOp();
- if (singleRSOp != null) {
- posToOpId.put(key, singleRSOp.operatorId);
+ Op joinRSOp = v.getJoinRSOp(vertex);
+ if (joinRSOp != null) {
+ posToOpId.put(key, joinRSOp.operatorId);
} else {
throw new Exception(
- "There are none or more than one root operators in a single vertex "
- + vertex.name
+ "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+ " when hive explain user is trying to identify the operator id.");
}
}
@@ -207,12 +206,12 @@ public final class Op {
} else if ((v.rootOps.size() == 0 && v.vertexType == VertexType.UNION)) {
posToOpId.put(entry.getKey(), v.name);
} else {
- Op singleRSOp = v.getSingleRSOp();
- if (singleRSOp != null) {
- posToOpId.put(entry.getKey(), singleRSOp.operatorId);
+ Op joinRSOp = v.getJoinRSOp(vertex);
+ if (joinRSOp != null) {
+ posToOpId.put(entry.getKey(), joinRSOp.operatorId);
} else {
throw new Exception(
- "There are none or more than one root operators in a single vertex " + v.name
+ "Can not find join reduceSinkOp for " + v.name + " to join " + vertex.name
+ " when hive explain user is trying to identify the operator id.");
}
}
@@ -336,8 +335,9 @@ public final class Op {
}
// print inline vertex
if (parser.inlineMap.containsKey(this)) {
- for (int index = 0; index < parser.inlineMap.get(this).size(); index++) {
- Connection connection = parser.inlineMap.get(this).get(index);
+ List<Connection> connections = parser.inlineMap.get(this);
+ Collections.sort(connections);
+ for (Connection connection : connections) {
connection.from.print(printer, indentFlag, connection.type, this.vertex);
}
}
@@ -347,9 +347,9 @@ public final class Op {
}
// print next vertex
else {
- for (int index = 0; index < noninlined.size(); index++) {
- Vertex v = noninlined.get(index).from;
- v.print(printer, indentFlag, noninlined.get(index).type, this.vertex);
+ Collections.sort(noninlined);
+ for (Connection connection : noninlined) {
+ connection.from.print(printer, indentFlag, connection.type, this.vertex);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
index 3d559bd..13ecac0 100644
--- a/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
+++ b/common/src/java/org/apache/hadoop/hive/common/jsonexplain/tez/Vertex.java
@@ -20,15 +20,12 @@ package org.apache.hadoop.hive.common.jsonexplain.tez;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hive.common.jsonexplain.tez.Op.OpType;
-import org.apache.hadoop.util.hash.Hash;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.json.JSONArray;
@@ -53,8 +50,8 @@ public final class Vertex implements Comparable<Vertex>{
// we create a dummy vertex for a mergejoin branch for a self join if this
// vertex is a mergejoin
public final List<Vertex> mergeJoinDummyVertexs = new ArrayList<>();
- // whether this vertex has multiple reduce operators
- public boolean hasMultiReduceOp = false;
+ // this vertex has multiple reduce operators
+ public int numReduceOp = 0;
// execution mode
public String executionMode = "";
// tagToInput for reduce work
@@ -217,7 +214,7 @@ public final class Vertex implements Comparable<Vertex>{
public void print(Printer printer, int indentFlag, String type, Vertex callingVertex)
throws JSONException, Exception {
// print vertexname
- if (parser.printSet.contains(this) && !hasMultiReduceOp) {
+ if (parser.printSet.contains(this) && numReduceOp <= 1) {
if (type != null) {
printer.println(TezJsonParser.prefixString(indentFlag, "<-")
+ " Please refer to the previous " + this.name + " [" + type + "]");
@@ -235,7 +232,7 @@ public final class Vertex implements Comparable<Vertex>{
printer.println(TezJsonParser.prefixString(indentFlag) + this.name + this.executionMode);
}
// print operators
- if (hasMultiReduceOp && !(callingVertex.vertexType == VertexType.UNION)) {
+ if (numReduceOp > 1 && !(callingVertex.vertexType == VertexType.UNION)) {
// find the right op
Op choose = null;
for (Op op : this.rootOps) {
@@ -273,16 +270,15 @@ public final class Vertex implements Comparable<Vertex>{
*/
public void checkMultiReduceOperator() {
// check if it is a reduce vertex and its children is more than 1;
- if (!this.name.contains("Reduce") || this.rootOps.size() < 2) {
+ if (this.rootOps.size() < 2) {
return;
}
// check if all the child ops are reduce output operators
for (Op op : this.rootOps) {
- if (op.type != OpType.RS) {
- return;
+ if (op.type == OpType.RS) {
+ numReduceOp++;
}
}
- this.hasMultiReduceOp = true;
}
public void setType(String type) {
@@ -304,28 +300,35 @@ public final class Vertex implements Comparable<Vertex>{
}
}
- //The following code should be gone after HIVE-11075 using topological order
+ // The following code should be gone after HIVE-11075 using topological order
@Override
public int compareTo(Vertex o) {
- return this.name.compareTo(o.name);
+ // we print the vertex that has more rs before the vertex that has fewer rs.
+ if (numReduceOp != o.numReduceOp) {
+ return -(numReduceOp - o.numReduceOp);
+ } else {
+ return this.name.compareTo(o.name);
+ }
}
- public Op getSingleRSOp() {
+ public Op getJoinRSOp(Vertex joinVertex) {
if (rootOps.size() == 0) {
return null;
+ } else if (rootOps.size() == 1) {
+ if (rootOps.get(0).type == OpType.RS) {
+ return rootOps.get(0);
+ } else {
+ return null;
+ }
} else {
- Op ret = null;
for (Op op : rootOps) {
if (op.type == OpType.RS) {
- if (ret == null) {
- ret = op;
- } else {
- // find more than one RS Op
- return null;
+ if (op.outputVertexName.equals(joinVertex.name)) {
+ return op;
}
}
}
- return ret;
+ return null;
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 116d0eb..d684ba8 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -407,6 +407,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\
minillap.query.files=acid_bucket_pruning.q,\
bucket5.q,\
bucket6.q,\
+ dynamic_semijoin_user_level.q,\
except_distinct.q,\
explainuser_2.q,\
empty_dir_in_table.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
new file mode 100644
index 0000000..88ab46e
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/dynamic_semijoin_user_level.q
@@ -0,0 +1,106 @@
+set hive.explain.user=true;
+set hive.compute.query.using.stats=false;
+set hive.mapred.mode=nonstrict;
+set hive.optimize.ppd=true;
+set hive.ppd.remove.duplicatefilters=true;
+set hive.tez.dynamic.partition.pruning=true;
+set hive.tez.dynamic.semijoin.reduction=true;
+set hive.optimize.metadataonly=false;
+set hive.optimize.index.filter=true;
+set hive.stats.autogather=true;
+set hive.tez.bigtable.minsize.semijoin.reduction=1;
+set hive.tez.min.bloom.filter.entries=1;
+set hive.stats.fetch.column.stats=true;
+
+-- Create Tables
+create table alltypesorc_int ( cint int, cstring string ) stored as ORC;
+create table srcpart_date (key string, value string) partitioned by (ds string ) stored as ORC;
+CREATE TABLE srcpart_small(key1 STRING, value1 STRING) partitioned by (ds string) STORED as ORC;
+
+-- Add Partitions
+alter table srcpart_date add partition (ds = "2008-04-08");
+alter table srcpart_date add partition (ds = "2008-04-09");
+
+alter table srcpart_small add partition (ds = "2008-04-08");
+alter table srcpart_small add partition (ds = "2008-04-09");
+
+-- Load
+insert overwrite table alltypesorc_int select cint, cstring1 from alltypesorc;
+insert overwrite table srcpart_date partition (ds = "2008-04-08" ) select key, value from srcpart where ds = "2008-04-08";
+insert overwrite table srcpart_date partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09";
+insert overwrite table srcpart_small partition (ds = "2008-04-09") select key, value from srcpart where ds = "2008-04-09" limit 20;
+
+set hive.tez.dynamic.semijoin.reduction=false;
+
+analyze table alltypesorc_int compute statistics for columns;
+analyze table srcpart_date compute statistics for columns;
+analyze table srcpart_small compute statistics for columns;
+
+-- single column, single key
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=true;
+
+-- Mix dynamic partition pruning(DPP) and min/max bloom filter optimizations. Should pick the DPP.
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.ds);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.ds);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+--multiple sources, single key
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_small.key1 = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- single source, multiple keys
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1 and srcpart_date.value = srcpart_small.value1);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- multiple sources, different keys
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+
+-- Explain extended to verify fast start for Reducer in semijoin branch
+EXPLAIN extended select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- With Mapjoins.
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=100000000000;
+
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1);
+set hive.tez.dynamic.semijoin.reduction=false;
+
+-- multiple sources, different keys
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+set hive.tez.dynamic.semijoin.reduction=true;
+EXPLAIN select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+select count(*) from srcpart_date join srcpart_small on (srcpart_date.key = srcpart_small.key1) join alltypesorc_int on (srcpart_date.value = alltypesorc_int.cstring);
+--set hive.tez.dynamic.semijoin.reduction=false;
+
+-- With unions
+explain select * from alltypesorc_int join
+ (select srcpart_date.key as key from srcpart_date
+ union all
+ select srcpart_small.key1 as key from srcpart_small) unionsrc on (alltypesorc_int.cstring = unionsrc.key);
+
+
+drop table srcpart_date;
+drop table srcpart_small;
+drop table alltypesorc_int;
http://git-wip-us.apache.org/repos/asf/hive/blob/eaa439e3/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q b/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q
new file mode 100644
index 0000000..2532f81
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udf_round_2_auto_stats.q
@@ -0,0 +1,16 @@
+set hive.fetch.task.conversion=more;
+set hive.stats.column.autogather=true;
+
+-- test for NaN (not-a-number)
+create table tstTbl1(n double);
+
+insert overwrite table tstTbl1
+select 'NaN' from src tablesample (1 rows);
+
+select * from tstTbl1;
+
+select round(n, 1) from tstTbl1;
+select round(n) from tstTbl1;
+
+-- test for Infinity
+select round(1/0), round(1/0, 2), round(1.0/0.0), round(1.0/0.0, 2) from src tablesample (1 rows);