You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/31 02:22:54 UTC

spark git commit: [SPARK-6319][SQL] Throw AnalysisException when using BinaryType on Join and Aggregate

Repository: spark
Updated Branches:
  refs/heads/master 0b1a464b6 -> 351eda0e2


[SPARK-6319][SQL] Throw AnalysisException when using BinaryType on Join and Aggregate

JIRA: https://issues.apache.org/jira/browse/SPARK-6319

Spark SQL uses plain byte arrays to represent binary values. However, the arrays are compared by reference rather than by values. Thus, we should not use BinaryType on Join and Aggregate in current implementation.

Author: Liang-Chi Hsieh <vi...@appier.com>

Closes #7787 from viirya/agg_no_binary_type and squashes the following commits:

4f76cac [Liang-Chi Hsieh] Throw AnalysisException when using BinaryType on Join and Aggregate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/351eda0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/351eda0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/351eda0e

Branch: refs/heads/master
Commit: 351eda0e2fd47c183c4298469970032097ad07a0
Parents: 0b1a464
Author: Liang-Chi Hsieh <vi...@appier.com>
Authored: Thu Jul 30 17:22:51 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jul 30 17:22:51 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 20 ++++++++++++++++++++
 .../spark/sql/DataFrameAggregateSuite.scala     | 11 ++++++++++-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  9 +++++++++
 3 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/351eda0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index a373714..0ebc3d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -87,6 +87,18 @@ trait CheckAnalysis {
               s"join condition '${condition.prettyString}' " +
                 s"of type ${condition.dataType.simpleString} is not a boolean.")
 
+          case j @ Join(_, _, _, Some(condition)) =>
+            def checkValidJoinConditionExprs(expr: Expression): Unit = expr match {
+              case p: Predicate =>
+                p.asInstanceOf[Expression].children.foreach(checkValidJoinConditionExprs)
+              case e if e.dataType.isInstanceOf[BinaryType] =>
+                failAnalysis(s"expression ${e.prettyString} in join condition " +
+                  s"'${condition.prettyString}' can't be binary type.")
+              case _ => // OK
+            }
+
+            checkValidJoinConditionExprs(condition)
+
           case Aggregate(groupingExprs, aggregateExprs, child) =>
             def checkValidAggregateExpression(expr: Expression): Unit = expr match {
               case _: AggregateExpression => // OK
@@ -100,7 +112,15 @@ trait CheckAnalysis {
               case e => e.children.foreach(checkValidAggregateExpression)
             }
 
+            def checkValidGroupingExprs(expr: Expression): Unit = expr.dataType match {
+              case BinaryType =>
+                failAnalysis(s"grouping expression '${expr.prettyString}' in aggregate can " +
+                  s"not be binary type.")
+              case _ => // OK
+            }
+
             aggregateExprs.foreach(checkValidAggregateExpression)
+            aggregateExprs.foreach(checkValidGroupingExprs)
 
           case Sort(orders, _, _) =>
             orders.foreach { order =>

http://git-wip-us.apache.org/repos/asf/spark/blob/351eda0e/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index b26d3ab..228ece8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.{BinaryType, DecimalType}
 
 
 class DataFrameAggregateSuite extends QueryTest {
@@ -191,4 +191,13 @@ class DataFrameAggregateSuite extends QueryTest {
       Row(null))
   }
 
+  test("aggregation can't work on binary type") {
+    val df = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
+    intercept[AnalysisException] {
+      df.groupBy("c").agg(count("*"))
+    }
+    intercept[AnalysisException] {
+      df.distinct
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/351eda0e/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 666f26b..27c08f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.types.BinaryType
 
 
 class JoinSuite extends QueryTest with BeforeAndAfterEach {
@@ -489,4 +490,12 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
         Row(3, 2) :: Nil)
 
   }
+
+  test("Join can't work on binary type") {
+    val left = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("c").select($"c" cast BinaryType)
+    val right = Seq(1, 1, 2, 2).map(i => Tuple1(i.toString)).toDF("d").select($"d" cast BinaryType)
+    intercept[AnalysisException] {
+      left.join(right, ($"left.N" === $"right.N"), "full")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org