You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/11/23 12:21:04 UTC

spark git commit: [SPARK-18519][SQL][BRANCH-2.0] map type can not be used in EqualTo

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 aefeaa77a -> f8ce884e6


[SPARK-18519][SQL][BRANCH-2.0] map type can not be used in EqualTo

## What changes were proposed in this pull request?

Technically map type is not orderable, but can be used in equality comparison. However, due to the limitation of the current implementation, map type can't be used in equality comparison so that it can't be join key or grouping key.

This PR makes this limitation explicit, to avoid wrong result.

backport https://github.com/apache/spark/pull/15956 to 2.0

## How was this patch tested?
updated tests

Author: Wenchen Fan <we...@databricks.com>

Closes #15988 from cloud-fan/map-type.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8ce884e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8ce884e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8ce884e

Branch: refs/heads/branch-2.0
Commit: f8ce884e64dab9e87e5cd28613fff32a3cd415ae
Parents: aefeaa7
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Nov 23 04:20:59 2016 -0800
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Nov 23 04:20:59 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 15 -------
 .../sql/catalyst/expressions/predicates.scala   | 30 +++++++++++++
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 44 +++++++-------------
 .../analysis/ExpressionTypeCheckingSuite.scala  |  2 +
 4 files changed, 48 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f8ce884e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index e09bebb..cc51c2a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -191,21 +191,6 @@ trait CheckAnalysis extends PredicateHelper {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.simpleString} is not a boolean.")
 
-          case j @ Join(_, _, _, Some(condition)) =>
-            def checkValidJoinConditionExprs(expr: Expression): Unit = expr match {
-              case p: Predicate =>
-                p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
-              case e if e.dataType.isInstanceOf[BinaryType] =>
-                failAnalysis(s"binary type expression ${e.sql} cannot be used " +
-                  "in join conditions")
-              case e if e.dataType.isInstanceOf[MapType] =>
-                failAnalysis(s"map type expression ${e.sql} cannot be used " +
-                  "in join conditions")
-              case _ => // OK
-            }
-
-            checkValidJoinConditionExprs(condition)
-
           case Aggregate(groupingExprs, aggregateExprs, child) =>
             def checkValidAggregateExpression(expr: Expression): Unit = expr match {
               case aggExpr: AggregateExpression =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f8ce884e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 40cda5f..8f04c27 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -417,6 +417,21 @@ case class EqualTo(left: Expression, right: Expression)
 
   override def inputType: AbstractDataType = AnyDataType
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+    super.checkInputDataTypes() match {
+      case TypeCheckResult.TypeCheckSuccess =>
+        // TODO: although map type is not orderable, technically map type should be able to be used
+        // in equality comparison, remove this type check once we support it.
+        if (left.dataType.existsRecursively(_.isInstanceOf[MapType])) {
+          TypeCheckResult.TypeCheckFailure("Cannot use map type in EqualTo, but the actual " +
+            s"input type is ${left.dataType.catalogString}.")
+        } else {
+          TypeCheckResult.TypeCheckSuccess
+        }
+      case failure => failure
+    }
+  }
+
   override def symbol: String = "="
 
   protected override def nullSafeEval(left: Any, right: Any): Any = ordering.equiv(left, right)
@@ -433,6 +448,21 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp
 
   override def inputType: AbstractDataType = AnyDataType
 
+  override def checkInputDataTypes(): TypeCheckResult = {
+    super.checkInputDataTypes() match {
+      case TypeCheckResult.TypeCheckSuccess =>
+        // TODO: although map type is not orderable, technically map type should be able to be used
+        // in equality comparison, remove this type check once we support it.
+        if (left.dataType.existsRecursively(_.isInstanceOf[MapType])) {
+          TypeCheckResult.TypeCheckFailure("Cannot use map type in EqualNullSafe, but the actual " +
+            s"input type is ${left.dataType.catalogString}.")
+        } else {
+          TypeCheckResult.TypeCheckSuccess
+        }
+      case failure => failure
+    }
+  }
+
   override def symbol: String = "<=>"
 
   override def nullable: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/f8ce884e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 6438065..2efb29f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -451,34 +451,22 @@ class AnalysisErrorSuite extends AnalysisTest {
         "another aggregate function." :: Nil)
   }
 
-  test("Join can't work on binary and map types") {
-    val plan =
-      Join(
-        LocalRelation(
-          AttributeReference("a", BinaryType)(exprId = ExprId(2)),
-          AttributeReference("b", IntegerType)(exprId = ExprId(1))),
-        LocalRelation(
-          AttributeReference("c", BinaryType)(exprId = ExprId(4)),
-          AttributeReference("d", IntegerType)(exprId = ExprId(3))),
-        Inner,
-        Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
-          AttributeReference("c", BinaryType)(exprId = ExprId(4)))))
-
-    assertAnalysisError(plan, "binary type expression `a` cannot be used in join conditions" :: Nil)
-
-    val plan2 =
-      Join(
-        LocalRelation(
-          AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
-          AttributeReference("b", IntegerType)(exprId = ExprId(1))),
-        LocalRelation(
-          AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
-          AttributeReference("d", IntegerType)(exprId = ExprId(3))),
-        Inner,
-        Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
-          AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))
-
-    assertAnalysisError(plan2, "map type expression `a` cannot be used in join conditions" :: Nil)
+  test("Join can work on binary types but can't work on map types") {
+    val left = LocalRelation('a.binary, 'b.map(StringType, StringType))
+    val right = LocalRelation('c.binary, 'd.map(StringType, StringType))
+
+    val plan1 = left.join(
+      right,
+      joinType = Inner,
+      condition = Some('a === 'c))
+
+    assertAnalysisSuccess(plan1)
+
+    val plan2 = left.join(
+      right,
+      joinType = Inner,
+      condition = Some('b === 'd))
+    assertAnalysisError(plan2, "Cannot use map type in EqualTo" :: Nil)
   }
 
   test("PredicateSubQuery is used outside of a filter") {

http://git-wip-us.apache.org/repos/asf/spark/blob/f8ce884e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
index 3aefb3c..c2327a2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
@@ -118,6 +118,8 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite {
     assertErrorForDifferingTypes(GreaterThan('intField, 'booleanField))
     assertErrorForDifferingTypes(GreaterThanOrEqual('intField, 'booleanField))
 
+    assertError(EqualTo('mapField, 'mapField), "Cannot use map type in EqualTo")
+    assertError(EqualNullSafe('mapField, 'mapField), "Cannot use map type in EqualNullSafe")
     assertError(LessThan('mapField, 'mapField),
       s"requires ${TypeCollection.Ordered.simpleString} type")
     assertError(LessThanOrEqual('mapField, 'mapField),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org