You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/12/05 11:12:51 UTC
hive git commit: HIVE-15239: hive on spark combine equivalent work
get wrong result because of TS operation compare (Rui reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/master eb5dde218 -> c4380fae1
HIVE-15239: hive on spark combine equivalent work get wrong result because of TS operation compare (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c4380fae
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c4380fae
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c4380fae
Branch: refs/heads/master
Commit: c4380fae11ad6e26137471c4d3c67c60feb91cab
Parents: eb5dde2
Author: Rui Li <li...@apache.org>
Authored: Mon Dec 5 19:12:21 2016 +0800
Committer: Rui Li <sh...@cn.ibm.com>
Committed: Mon Dec 5 19:12:21 2016 +0800
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 3 +-
.../spark/CombineEquivalentWorkResolver.java | 25 ++++
.../hadoop/hive/ql/plan/PartitionDesc.java | 68 +++++++++++
.../hive/ql/plan/VectorPartitionDesc.java | 34 ++++++
.../spark_combine_equivalent_work.q | 33 ++++++
.../spark/spark_combine_equivalent_work.q.out | 115 +++++++++++++++++++
6 files changed, 277 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 27ece51..772e123 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1468,7 +1468,8 @@ spark.query.files=add_part_multiple.q, \
# Unlike "spark.query.files" above, these tests only run
# under Spark engine.
-spark.only.query.files=spark_dynamic_partition_pruning.q,\
+spark.only.query.files=spark_combine_equivalent_work.q,\
+ spark_dynamic_partition_pruning.q,\
spark_dynamic_partition_pruning_2.q,\
spark_vectorized_dynamic_partition_pruning.q
http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 41e9ba6..ec192a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -29,6 +29,9 @@ import java.util.Stack;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -210,6 +213,11 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return false;
}
+ // need to check paths and partition desc for MapWorks
+ if (first instanceof MapWork && !compareMapWork((MapWork) first, (MapWork) second)) {
+ return false;
+ }
+
Set<Operator<?>> firstRootOperators = first.getAllRootOperators();
Set<Operator<?>> secondRootOperators = second.getAllRootOperators();
if (firstRootOperators.size() != secondRootOperators.size()) {
@@ -228,6 +236,23 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return true;
}
+ private boolean compareMapWork(MapWork first, MapWork second) {
+ Map<Path, PartitionDesc> pathToPartition1 = first.getPathToPartitionInfo();
+ Map<Path, PartitionDesc> pathToPartition2 = second.getPathToPartitionInfo();
+ if (pathToPartition1.size() == pathToPartition2.size()) {
+ for (Map.Entry<Path, PartitionDesc> entry : pathToPartition1.entrySet()) {
+ Path path1 = entry.getKey();
+ PartitionDesc partitionDesc1 = entry.getValue();
+ PartitionDesc partitionDesc2 = pathToPartition2.get(path1);
+ if (!partitionDesc1.equals(partitionDesc2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) {
boolean result = true;
List<BaseWork> firstParents = sparkWork.getParents(first);
http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 921461f..73981e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -298,6 +298,74 @@ public class PartitionDesc implements Serializable, Cloneable {
return ret;
}
+ @Override
+ public boolean equals(Object o) {
+ boolean cond = o instanceof PartitionDesc;
+ if (!cond) {
+ return false;
+ }
+
+ PartitionDesc other = (PartitionDesc) o;
+ Class<? extends InputFormat> input1 = getInputFileFormatClass();
+ Class<? extends InputFormat> input2 = other.getInputFileFormatClass();
+ cond = (input1 == null && input2 == null) || (input1 != null && input1.equals(input2));
+ if (!cond) {
+ return false;
+ }
+
+ Class<? extends OutputFormat> output1 = getOutputFileFormatClass();
+ Class<? extends OutputFormat> output2 = other.getOutputFileFormatClass();
+ cond = (output1 == null && output2 == null) || (output1 != null && output1.equals(output2));
+ if (!cond) {
+ return false;
+ }
+
+ Properties properties1 = getProperties();
+ Properties properties2 = other.getProperties();
+ cond = (properties1 == null && properties2 == null) ||
+ (properties1 != null && properties1.equals(properties2));
+ if (!cond) {
+ return false;
+ }
+
+ TableDesc tableDesc1 = getTableDesc();
+ TableDesc tableDesc2 = other.getTableDesc();
+ cond = (tableDesc1 == null && tableDesc2 == null) ||
+ (tableDesc1 != null && tableDesc1.equals(tableDesc2));
+ if (!cond) {
+ return false;
+ }
+
+ Map<String, String> partSpec1 = getPartSpec();
+ Map<String, String> partSpec2 = other.getPartSpec();
+ cond = (partSpec1 == null && partSpec2 == null) ||
+ (partSpec1 != null && partSpec1.equals(partSpec2));
+ if (!cond) {
+ return false;
+ }
+
+ VectorPartitionDesc vecPartDesc1 = getVectorPartitionDesc();
+ VectorPartitionDesc vecPartDesc2 = other.getVectorPartitionDesc();
+ return (vecPartDesc1 == null && vecPartDesc2 == null) ||
+ (vecPartDesc1 != null && vecPartDesc1.equals(vecPartDesc2));
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = result * prime +
+ (getInputFileFormatClass() == null ? 0 : getInputFileFormatClass().hashCode());
+ result = result * prime +
+ (getOutputFileFormatClass() == null ? 0 : getOutputFileFormatClass().hashCode());
+ result = result * prime + (getProperties() == null ? 0 : getProperties().hashCode());
+ result = result * prime + (getTableDesc() == null ? 0 : getTableDesc().hashCode());
+ result = result * prime + (getPartSpec() == null ? 0 : getPartSpec().hashCode());
+ result = result * prime +
+ (getVectorPartitionDesc() == null ? 0 : getVectorPartitionDesc().hashCode());
+ return result;
+ }
+
/**
* Attempt to derive a virtual <code>base file name</code> property from the
* path. If path format is unrecognized, just use the full path.
http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
index 2b61ec0..7bf70c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.Arrays;
import java.util.List;
+import com.google.common.base.Strings;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
@@ -156,6 +157,39 @@ public class VectorPartitionDesc {
return result;
}
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof VectorPartitionDesc) {
+ VectorPartitionDesc other = (VectorPartitionDesc) o;
+ return Strings.nullToEmpty(getInputFileFormatClassName()).equals(
+ Strings.nullToEmpty(other.getInputFileFormatClassName())) &&
+ Strings.nullToEmpty(getRowDeserializerClassName()).equals(
+ Strings.nullToEmpty(other.getRowDeserializerClassName())) &&
+ getVectorDeserializeType() == other.getVectorDeserializeType() &&
+ getVectorMapOperatorReadType() == other.getVectorMapOperatorReadType() &&
+ getIsInputFileFormatSelfDescribing() == other.getIsInputFileFormatSelfDescribing() &&
+ Arrays.equals(getDataTypeInfos(), other.getDataTypeInfos());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = result * prime +
+ (getInputFileFormatClassName() == null ? 0 : getInputFileFormatClassName().hashCode());
+ result = result * prime +
+ (getRowDeserializerClassName() == null ? 0 : getRowDeserializerClassName().hashCode());
+ result = result * prime +
+ (getVectorDeserializeType() == null ? 0 : getVectorDeserializeType().hashCode());
+ result = result * prime +
+ (getVectorMapOperatorReadType() == null ? 0 : getVectorMapOperatorReadType().hashCode());
+ result = result * prime + Boolean.valueOf(getIsInputFileFormatSelfDescribing()).hashCode();
+ result = result * prime + Arrays.hashCode(getDataTypeInfos());
+ return result;
+ }
+
public VectorMapOperatorReadType getVectorMapOperatorReadType() {
return vectorMapOperatorReadType;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q
new file mode 100644
index 0000000..f85d8ee
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q
@@ -0,0 +1,33 @@
+set hive.vectorized.execution.enabled = false;
+
+create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string);
+create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string);
+
+alter table a1 add partition(END_DT='20161020');
+alter table a1 add partition(END_DT='20161021');
+
+insert into table a1 partition(END_DT='20161020') values('2000721360','20161001');
+
+
+SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a2 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1;
+
+SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1;
+
+DROP TABLE a1;
+DROP TABLE a2;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out
new file mode 100644
index 0000000..93d07d6
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out
@@ -0,0 +1,115 @@
+PREHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@a1
+POSTHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@a1
+PREHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@a2
+POSTHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@a2
+PREHOOK: query: alter table a1 add partition(END_DT='20161020')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@a1
+POSTHOOK: query: alter table a1 add partition(END_DT='20161020')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@a1
+POSTHOOK: Output: default@a1@end_dt=20161020
+PREHOOK: query: alter table a1 add partition(END_DT='20161021')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@a1
+POSTHOOK: query: alter table a1 add partition(END_DT='20161021')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@a1
+POSTHOOK: Output: default@a1@end_dt=20161021
+PREHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@a1@end_dt=20161020
+POSTHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@a1@end_dt=20161020
+POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).kehhao SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).start_dt SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a2 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@a1
+PREHOOK: Input: default@a1@end_dt=20161020
+PREHOOK: Input: default@a1@end_dt=20161021
+PREHOOK: Input: default@a2
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a2 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@a1
+POSTHOOK: Input: default@a1@end_dt=20161020
+POSTHOOK: Input: default@a1@end_dt=20161021
+POSTHOOK: Input: default@a2
+#### A masked pattern was here ####
+PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@a1
+PREHOOK: Input: default@a1@end_dt=20161020
+PREHOOK: Input: default@a1@end_dt=20161021
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@a1
+POSTHOOK: Input: default@a1@end_dt=20161020
+POSTHOOK: Input: default@a1@end_dt=20161021
+#### A masked pattern was here ####
+2000721360 2
+PREHOOK: query: DROP TABLE a1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@a1
+PREHOOK: Output: default@a1
+POSTHOOK: query: DROP TABLE a1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@a1
+POSTHOOK: Output: default@a1
+PREHOOK: query: DROP TABLE a2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@a2
+PREHOOK: Output: default@a2
+POSTHOOK: query: DROP TABLE a2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@a2
+POSTHOOK: Output: default@a2