You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/07/08 03:36:48 UTC
spark git commit: [SPARK-8868] SqlSerializer2 can go into infinite
loop when row consists only of NullType columns
Repository: spark
Updated Branches:
refs/heads/master 4ca90935c -> 68a4a1697
[SPARK-8868] SqlSerializer2 can go into infinite loop when row consists only of NullType columns
https://issues.apache.org/jira/browse/SPARK-8868
Author: Yin Huai <yh...@databricks.com>
Closes #7262 from yhuai/SPARK-8868 and squashes the following commits:
cb58780 [Yin Huai] Andrew's comment.
e456857 [Yin Huai] Josh's comments.
5122e65 [Yin Huai] If types of all columns are NullTypes, do not use serializer2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68a4a169
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68a4a169
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68a4a169
Branch: refs/heads/master
Commit: 68a4a169714e11d8c537ad9431ae9974f6b7e8d3
Parents: 4ca9093
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Jul 7 18:36:35 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Jul 7 18:36:35 2015 -0700
----------------------------------------------------------------------
.../sql/execution/SparkSqlSerializer2.scala | 25 ++++++++++++++++----
.../execution/SparkSqlSerializer2Suite.scala | 20 +++++++++++++++-
2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/68a4a169/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index 056d435..6ed822d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -179,23 +179,38 @@ private[sql] object SparkSqlSerializer2 {
/**
* Check if rows with the given schema can be serialized with ShuffleSerializer.
+ * Right now, we do not support a schema having complex types or UDTs, or all data types
+ * of fields are NullTypes.
*/
def support(schema: Array[DataType]): Boolean = {
if (schema == null) return true
+ var allNullTypes = true
var i = 0
while (i < schema.length) {
schema(i) match {
- case udt: UserDefinedType[_] => return false
- case array: ArrayType => return false
- case map: MapType => return false
- case struct: StructType => return false
+ case NullType => // Do nothing
+ case udt: UserDefinedType[_] =>
+ allNullTypes = false
+ return false
+ case array: ArrayType =>
+ allNullTypes = false
+ return false
+ case map: MapType =>
+ allNullTypes = false
+ return false
+ case struct: StructType =>
+ allNullTypes = false
+ return false
case _ =>
+ allNullTypes = false
}
i += 1
}
- return true
+ // If types of fields are all NullTypes, we return false.
+ // Otherwise, we return true.
+ return !allNullTypes
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/68a4a169/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
index 8631e24..71f6b26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlSerializer2Suite.scala
@@ -42,7 +42,6 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
}
checkSupported(null, isSupported = true)
- checkSupported(NullType, isSupported = true)
checkSupported(BooleanType, isSupported = true)
checkSupported(ByteType, isSupported = true)
checkSupported(ShortType, isSupported = true)
@@ -57,6 +56,8 @@ class SparkSqlSerializer2DataTypeSuite extends SparkFunSuite {
checkSupported(DecimalType(10, 5), isSupported = true)
checkSupported(DecimalType.Unlimited, isSupported = true)
+ // If NullType is the only data type in the schema, we do not support it.
+ checkSupported(NullType, isSupported = false)
// For now, ArrayType, MapType, and StructType are not supported.
checkSupported(ArrayType(DoubleType, true), isSupported = false)
checkSupported(ArrayType(StringType, false), isSupported = false)
@@ -170,6 +171,23 @@ abstract class SparkSqlSerializer2Suite extends QueryTest with BeforeAndAfterAll
val df = ctx.sql(s"SELECT 1 + 1 FROM shuffle")
checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
}
+
+ test("types of fields are all NullTypes") {
+ // Test range partitioning code path.
+ val nulls = ctx.sql(s"SELECT null as a, null as b, null as c")
+ val df = nulls.unionAll(nulls).sort("a")
+ checkSerializer(df.queryExecution.executedPlan, classOf[SparkSqlSerializer])
+ checkAnswer(
+ df,
+ Row(null, null, null) :: Row(null, null, null) :: Nil)
+
+ // Test hash partitioning code path.
+ val oneRow = ctx.sql(s"SELECT DISTINCT null, null, null FROM shuffle")
+ checkSerializer(oneRow.queryExecution.executedPlan, classOf[SparkSqlSerializer])
+ checkAnswer(
+ oneRow,
+ Row(null, null, null))
+ }
}
/** Tests SparkSqlSerializer2 with sort based shuffle without sort merge. */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org