You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/07/08 03:36:48 UTC

spark git commit: [SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns

Repository: spark
Updated Branches:
  refs/heads/master 4ca90935c -> 68a4a1697


[SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns

https://issues.apache.org/jira/browse/SPARK-8868

Author: Yin Huai <yh...@databricks.com>

Closes #7262 from yhuai/SPARK-8868 and squashes the following commits:

cb58780 [Yin Huai] Andrew's comment.
e456857 [Yin Huai] Josh's comments.
5122e65 [Yin Huai] If types of all columns are NullTypes, do not use serializer2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68a4a169
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68a4a169
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68a4a169

Branch: refs/heads/master
Commit: 68a4a169714e11d8c537ad9431ae9974f6b7e8d3
Parents: 4ca9093
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Jul 7 18:36:35 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jul 7 18:36:35 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/SparkSqlSerializer2.scala     | 25 ++++++++++++++++----
 .../execution/SparkSqlSerializer2Suite.scala    | 20 +++++++++++++++-
 2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/68a4a169/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 056d435..6ed822d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -179,23 +179,38 @@ private[sql] object SparkSqlSerializer2 {
 
   /**
    * Check if rows with the given schema can be serialized with ShuffleSerializer.
+   * Right now, we do not support a schema having complex types or UDTs, or all data types
+   * of fields are NullTypes.
    */
   def support(schema: Array[DataType]): Boolean = {
     if (schema == null) return true
 
+    var allNullTypes = true
     var i = 0
     while (i < schema.length) {
       schema(i) match {
-        case udt: UserDefinedType[_] => return false
-        case array: ArrayType => return false
-        case map: MapType => return false
-        case struct: StructType => return false
+        case NullType => // Do nothing
+        case udt: UserDefinedType[_] =>
+          allNullTypes = false
+          return false
+        case array: ArrayType =>
+          allNullTypes = false
+          return false
+        case map: MapType =>
+          allNullTypes = false
+          return false
+        case struct: StructType =>
+          allNullTypes = false
+          return false
         case _ =>
+          allNullTypes = false
       }
       i += 1
     }
 
-    return true
+    // If types of fields are all NullTypes, we return false.
+    // Otherwise, we return true.
+    return !allNullTypes
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/68a4a169/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
index 8631e24..71f6b26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
@@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
   }
 
   checkSupported(null, isSupported = true)
-  checkSupported(NullType, isSupported = true)
   checkSupported(BooleanType, isSupported = true)
   checkSupported(ByteType, isSupported = true)
   checkSupported(ShortType, isSupported = true)
@@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
   checkSupported(DecimalType(10, 5), isSupported = true)
   checkSupported(DecimalType.Unlimited, isSupported = true)
 
+  // If NullType is the only data type in the schema, we do not support it.
+  checkSupported(NullType, isSupported = false)
   // For now, ArrayType, MapType, and StructType are not supported.
   checkSupported(ArrayType(DoubleType, true), isSupported = false)
   checkSupported(ArrayType(StringType, false), isSupported = false)
@@ -170,6 +171,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
     val df = ctx.sql(s"SELECT 1 + 1 FROM shuffle")
     checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
   }
+
+  test("types of fields are all NullTypes") {
+    // Test range partitioning code path.
+    val nulls = ctx.sql(s"SELECT null as a, null as b, null as c")
+    val df = nulls.unionAll(nulls).sort("a")
+    checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
+    checkAnswer(
+      df,
+      Row(null, null, null) :: Row(null, null, null) :: Nil)
+
+    // Test hash partitioning code path.
+    val oneRow = ctx.sql(s"SELECT DISTINCT null, null, null FROM shuffle")
+    checkSerializer(oneRow.queryExecution.executedPlan, classOf[SparkSqlSerializer])
+    checkAnswer(
+      oneRow,
+      Row(null, null, null))
+  }
 }
 
 /** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org