You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/11 04:23:49 UTC

git commit: [SPARK-2415] [SQL] RowWriteSupport should handle empty ArrayType correctly.

Repository: spark
Updated Branches:
  refs/heads/master f62c42728 -> f5abd2712


[SPARK-2415] [SQL] RowWriteSupport should handle empty ArrayType correctly.

`RowWriteSupport` doesn't write empty `ArrayType` value, so the read value becomes `null`.
It should write empty `ArrayType` value as it is.

Author: Takuya UESHIN <ue...@happy-camper.st>

Closes #1339 from ueshin/issues/SPARK-2415 and squashes the following commits:

32afc87 [Takuya UESHIN] Merge branch 'master' into issues/SPARK-2415
2f05196 [Takuya UESHIN] Fix RowWriteSupport to handle empty ArrayType correctly.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5abd271
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5abd271
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5abd271

Branch: refs/heads/master
Commit: f5abd271292f5c98eb8b1974c1df31d08ed388dd
Parents: f62c427
Author: Takuya UESHIN <ue...@happy-camper.st>
Authored: Thu Jul 10 19:23:44 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Jul 10 19:23:44 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/parquet/ParquetConverter.scala | 12 ++++++------
 .../apache/spark/sql/parquet/ParquetTableSupport.scala  | 10 +++++-----
 .../apache/spark/sql/parquet/ParquetQuerySuite.scala    | 10 +++++-----
 3 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5abd271/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 889a408..75748b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -229,9 +229,9 @@ private[parquet] class CatalystGroupConverter(
     this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)
 
   protected [parquet] val converters: Array[Converter] =
-    schema.map(field =>
-      CatalystConverter.createConverter(field, schema.indexOf(field), this))
-    .toArray
+    schema.zipWithIndex.map {
+      case (field, idx) => CatalystConverter.createConverter(field, idx, this)
+    }.toArray
 
   override val size = schema.size
 
@@ -288,9 +288,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
       new ParquetRelation.RowType(attributes.length))
 
   protected [parquet] val converters: Array[Converter] =
-    schema.map(field =>
-      CatalystConverter.createConverter(field, schema.indexOf(field), this))
-      .toArray
+    schema.zipWithIndex.map {
+      case (field, idx) => CatalystConverter.createConverter(field, idx, this)
+    }.toArray
 
   override val size = schema.size
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f5abd271/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 9cd5dc5..108f8b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -156,7 +156,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
     writer.startMessage()
     while(index < attributes.size) {
       // null values indicate optional fields but we do not check currently
-      if (record(index) != null && record(index) != Nil) {
+      if (record(index) != null) {
         writer.startField(attributes(index).name, index)
         writeValue(attributes(index).dataType, record(index))
         writer.endField(attributes(index).name, index)
@@ -167,7 +167,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   }
 
   private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
-    if (value != null && value != Nil) {
+    if (value != null) {
       schema match {
         case t @ ArrayType(_) => writeArray(
           t,
@@ -184,7 +184,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   }
 
   private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
-    if (value != null && value != Nil) {
+    if (value != null) {
       schema match {
         case StringType => writer.addBinary(
           Binary.fromByteArray(
@@ -206,12 +206,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   private[parquet] def writeStruct(
       schema: StructType,
       struct: CatalystConverter.StructScalaType[_]): Unit = {
-    if (struct != null && struct != Nil) {
+    if (struct != null) {
       val fields = schema.fields.toArray
       writer.startGroup()
       var i = 0
       while(i < fields.size) {
-        if (struct(i) != null && struct(i) != Nil) {
+        if (struct(i) != null) {
           writer.startField(fields(i).name, i)
           writeValue(fields(i).dataType, struct(i))
           writer.endField(fields(i).name, i)

http://git-wip-us.apache.org/repos/asf/spark/blob/f5abd271/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index dbf3159..8fa143e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -78,7 +78,7 @@ case class AllDataTypesWithNonPrimitiveType(
     booleanField: Boolean,
     array: Seq[Int],
     map: Map[Int, String],
-    nested: Nested)
+    data: Data)
 
 class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
   TestData // Load test data tables.
@@ -138,7 +138,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     TestSQLContext.sparkContext.parallelize(range)
       .map(x => AllDataTypesWithNonPrimitiveType(
         s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
-        Seq(x), Map(x -> s"$x"), Nested(x, s"$x")))
+        (0 until x), (0 until x).map(i => i -> s"$i").toMap, Data((0 until x), Nested(x, s"$x"))))
       .saveAsParquetFile(tempDir)
     val result = parquetFile(tempDir).collect()
     range.foreach {
@@ -151,9 +151,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
         assert(result(i).getShort(5) === i.toShort)
         assert(result(i).getByte(6) === i.toByte)
         assert(result(i).getBoolean(7) === (i % 2 == 0))
-        assert(result(i)(8) === Seq(i))
-        assert(result(i)(9) === Map(i -> s"$i"))
-        assert(result(i)(10) === new GenericRow(Array[Any](i, s"$i")))
+        assert(result(i)(8) === (0 until i))
+        assert(result(i)(9) === (0 until i).map(i => i -> s"$i").toMap)
+        assert(result(i)(10) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
     }
   }