You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/12/05 11:12:51 UTC

hive git commit: HIVE-15239: hive on spark combine equivalent work get wrong result because of TS operation compare (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master eb5dde218 -> c4380fae1


HIVE-15239: hive on spark combine equivalent work get wrong result because of  TS operation compare (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c4380fae
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c4380fae
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c4380fae

Branch: refs/heads/master
Commit: c4380fae11ad6e26137471c4d3c67c60feb91cab
Parents: eb5dde2
Author: Rui Li <li...@apache.org>
Authored: Mon Dec 5 19:12:21 2016 +0800
Committer: Rui Li <sh...@cn.ibm.com>
Committed: Mon Dec 5 19:12:21 2016 +0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   3 +-
 .../spark/CombineEquivalentWorkResolver.java    |  25 ++++
 .../hadoop/hive/ql/plan/PartitionDesc.java      |  68 +++++++++++
 .../hive/ql/plan/VectorPartitionDesc.java       |  34 ++++++
 .../spark_combine_equivalent_work.q             |  33 ++++++
 .../spark/spark_combine_equivalent_work.q.out   | 115 +++++++++++++++++++
 6 files changed, 277 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 27ece51..772e123 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1468,7 +1468,8 @@ spark.query.files=add_part_multiple.q, \
 
 # Unlike "spark.query.files" above, these tests only run
 # under Spark engine.
-spark.only.query.files=spark_dynamic_partition_pruning.q,\
+spark.only.query.files=spark_combine_equivalent_work.q,\
+  spark_dynamic_partition_pruning.q,\
   spark_dynamic_partition_pruning_2.q,\
   spark_vectorized_dynamic_partition_pruning.q
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 41e9ba6..ec192a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -29,6 +29,9 @@ import java.util.Stack;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -210,6 +213,11 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
         return false;
       }
 
+      // need to check paths and partition desc for MapWorks
+      if (first instanceof MapWork && !compareMapWork((MapWork) first, (MapWork) second)) {
+        return false;
+      }
+
       Set<Operator<?>> firstRootOperators = first.getAllRootOperators();
       Set<Operator<?>> secondRootOperators = second.getAllRootOperators();
       if (firstRootOperators.size() != secondRootOperators.size()) {
@@ -228,6 +236,23 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       return true;
     }
 
+    private boolean compareMapWork(MapWork first, MapWork second) {
+      Map<Path, PartitionDesc> pathToPartition1 = first.getPathToPartitionInfo();
+      Map<Path, PartitionDesc> pathToPartition2 = second.getPathToPartitionInfo();
+      if (pathToPartition1.size() == pathToPartition2.size()) {
+        for (Map.Entry<Path, PartitionDesc> entry : pathToPartition1.entrySet()) {
+          Path path1 = entry.getKey();
+          PartitionDesc partitionDesc1 = entry.getValue();
+          PartitionDesc partitionDesc2 = pathToPartition2.get(path1);
+          if (!partitionDesc1.equals(partitionDesc2)) {
+            return false;
+          }
+        }
+        return true;
+      }
+      return false;
+    }
+
     private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) {
       boolean result = true;
       List<BaseWork> firstParents = sparkWork.getParents(first);

http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index 921461f..73981e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -298,6 +298,74 @@ public class PartitionDesc implements Serializable, Cloneable {
     return ret;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    boolean cond = o instanceof PartitionDesc;
+    if (!cond) {
+      return false;
+    }
+
+    PartitionDesc other = (PartitionDesc) o;
+    Class<? extends InputFormat> input1 = getInputFileFormatClass();
+    Class<? extends InputFormat> input2 = other.getInputFileFormatClass();
+    cond = (input1 == null && input2 == null) || (input1 != null && input1.equals(input2));
+    if (!cond) {
+      return false;
+    }
+
+    Class<? extends OutputFormat> output1 = getOutputFileFormatClass();
+    Class<? extends OutputFormat> output2 = other.getOutputFileFormatClass();
+    cond = (output1 == null && output2 == null) || (output1 != null && output1.equals(output2));
+    if (!cond) {
+      return false;
+    }
+
+    Properties properties1 = getProperties();
+    Properties properties2 = other.getProperties();
+    cond = (properties1 == null && properties2 == null) ||
+        (properties1 != null && properties1.equals(properties2));
+    if (!cond) {
+      return false;
+    }
+
+    TableDesc tableDesc1 = getTableDesc();
+    TableDesc tableDesc2 = other.getTableDesc();
+    cond = (tableDesc1 == null && tableDesc2 == null) ||
+        (tableDesc1 != null && tableDesc1.equals(tableDesc2));
+    if (!cond) {
+      return false;
+    }
+
+    Map<String, String> partSpec1 = getPartSpec();
+    Map<String, String> partSpec2 = other.getPartSpec();
+    cond = (partSpec1 == null && partSpec2 == null) ||
+        (partSpec1 != null && partSpec1.equals(partSpec2));
+    if (!cond) {
+      return false;
+    }
+
+    VectorPartitionDesc vecPartDesc1 = getVectorPartitionDesc();
+    VectorPartitionDesc vecPartDesc2 = other.getVectorPartitionDesc();
+    return (vecPartDesc1 == null && vecPartDesc2 == null) ||
+        (vecPartDesc1 != null && vecPartDesc1.equals(vecPartDesc2));
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = result * prime +
+        (getInputFileFormatClass() == null ? 0 : getInputFileFormatClass().hashCode());
+    result = result * prime +
+        (getOutputFileFormatClass() == null ? 0 : getOutputFileFormatClass().hashCode());
+    result = result * prime + (getProperties() == null ? 0 : getProperties().hashCode());
+    result = result * prime + (getTableDesc() == null ? 0 : getTableDesc().hashCode());
+    result = result * prime + (getPartSpec() == null ? 0 : getPartSpec().hashCode());
+    result = result * prime +
+        (getVectorPartitionDesc() == null ? 0 : getVectorPartitionDesc().hashCode());
+    return result;
+  }
+
   /**
    * Attempt to derive a virtual <code>base file name</code> property from the
    * path. If path format is unrecognized, just use the full path.

http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
index 2b61ec0..7bf70c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/VectorPartitionDesc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
 /**
@@ -156,6 +157,39 @@ public class VectorPartitionDesc  {
     return result;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof VectorPartitionDesc) {
+      VectorPartitionDesc other = (VectorPartitionDesc) o;
+      return Strings.nullToEmpty(getInputFileFormatClassName()).equals(
+          Strings.nullToEmpty(other.getInputFileFormatClassName())) &&
+          Strings.nullToEmpty(getRowDeserializerClassName()).equals(
+              Strings.nullToEmpty(other.getRowDeserializerClassName())) &&
+          getVectorDeserializeType() == other.getVectorDeserializeType() &&
+          getVectorMapOperatorReadType() == other.getVectorMapOperatorReadType() &&
+          getIsInputFileFormatSelfDescribing() == other.getIsInputFileFormatSelfDescribing() &&
+          Arrays.equals(getDataTypeInfos(), other.getDataTypeInfos());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = result * prime +
+        (getInputFileFormatClassName() == null ? 0 : getInputFileFormatClassName().hashCode());
+    result = result * prime +
+        (getRowDeserializerClassName() == null ? 0 : getRowDeserializerClassName().hashCode());
+    result = result * prime +
+        (getVectorDeserializeType() == null ? 0 : getVectorDeserializeType().hashCode());
+    result = result * prime +
+        (getVectorMapOperatorReadType() == null ? 0 : getVectorMapOperatorReadType().hashCode());
+    result = result * prime + Boolean.valueOf(getIsInputFileFormatSelfDescribing()).hashCode();
+    result = result * prime + Arrays.hashCode(getDataTypeInfos());
+    return result;
+  }
+
   public VectorMapOperatorReadType getVectorMapOperatorReadType() {
     return vectorMapOperatorReadType;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q
new file mode 100644
index 0000000..f85d8ee
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work.q
@@ -0,0 +1,33 @@
+set hive.vectorized.execution.enabled = false;
+
+create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string);
+create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string);
+
+alter table a1 add partition(END_DT='20161020');
+alter table a1 add partition(END_DT='20161021');
+
+insert into table a1 partition(END_DT='20161020') values('2000721360','20161001');
+
+
+SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a2 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1;
+
+SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1;
+
+DROP TABLE a1;
+DROP TABLE a2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/c4380fae/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out
new file mode 100644
index 0000000..93d07d6
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work.q.out
@@ -0,0 +1,115 @@
+PREHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@a1
+POSTHOOK: query: create table a1(KEHHAO string, START_DT string) partitioned by (END_DT string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@a1
+PREHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@a2
+POSTHOOK: query: create table a2(KEHHAO string, START_DT string) partitioned by (END_DT string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@a2
+PREHOOK: query: alter table a1 add partition(END_DT='20161020')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@a1
+POSTHOOK: query: alter table a1 add partition(END_DT='20161020')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@a1
+POSTHOOK: Output: default@a1@end_dt=20161020
+PREHOOK: query: alter table a1 add partition(END_DT='20161021')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@a1
+POSTHOOK: query: alter table a1 add partition(END_DT='20161021')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@a1
+POSTHOOK: Output: default@a1@end_dt=20161021
+PREHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@a1@end_dt=20161020
+POSTHOOK: query: insert into table a1 partition(END_DT='20161020') values('2000721360','20161001')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@a1@end_dt=20161020
+POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).kehhao SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: a1 PARTITION(end_dt=20161020).start_dt SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a2 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@a1
+PREHOOK: Input: default@a1@end_dt=20161020
+PREHOOK: Input: default@a1@end_dt=20161021
+PREHOOK: Input: default@a2
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a2 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@a1
+POSTHOOK: Input: default@a1@end_dt=20161020
+POSTHOOK: Input: default@a1@end_dt=20161021
+POSTHOOK: Input: default@a2
+#### A masked pattern was here ####
+PREHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@a1
+PREHOOK: Input: default@a1@end_dt=20161020
+PREHOOK: Input: default@a1@end_dt=20161021
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT T1.KEHHAO,COUNT(1) FROM (
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+UNION ALL
+SELECT KEHHAO FROM a1 T
+WHERE T.KEHHAO = '2000721360' AND '20161018' BETWEEN T.START_DT AND T.END_DT-1
+) T1
+GROUP BY T1.KEHHAO
+HAVING COUNT(1)>1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@a1
+POSTHOOK: Input: default@a1@end_dt=20161020
+POSTHOOK: Input: default@a1@end_dt=20161021
+#### A masked pattern was here ####
+2000721360	2
+PREHOOK: query: DROP TABLE a1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@a1
+PREHOOK: Output: default@a1
+POSTHOOK: query: DROP TABLE a1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@a1
+POSTHOOK: Output: default@a1
+PREHOOK: query: DROP TABLE a2
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@a2
+PREHOOK: Output: default@a2
+POSTHOOK: query: DROP TABLE a2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@a2
+POSTHOOK: Output: default@a2