You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/02/20 13:54:46 UTC

[spark] branch branch-2.4 updated: [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC deserializer

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 274142b  [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC deserializer
274142b is described below

commit 274142be08eb3a4239046d7f7260c7284ed041c2
Author: Ivan Vergiliev <iv...@gmail.com>
AuthorDate: Wed Feb 20 21:49:38 2019 +0800

    [SPARK-26859][SQL] Fix field writer index bug in non-vectorized ORC deserializer
    
    ## What changes were proposed in this pull request?
    
    This happens in a schema evolution use case only when a user specifies the schema manually and use non-vectorized ORC deserializer code path.
    
    There is a bug in `OrcDeserializer.scala` that results in `null`s being set at the wrong column position, and for state from previous records to remain uncleared in next records. There are more details for when exactly the bug gets triggered and what the outcome is in the [JIRA issue](https://jira.apache.org/jira/browse/SPARK-26859).
    
    The high-level summary is that this bug results in severe data correctness issues, but fortunately the set of conditions to expose the bug are complicated and make the surface area somewhat small.
    
    This change fixes the problem and adds a respective test.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the newly added test cases.
    
    Closes #23766 from IvanVergiliev/fix-orc-deserializer.
    
    Lead-authored-by: Ivan Vergiliev <iv...@gmail.com>
    Co-authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 096552ae4d6fcef5e20c54384a2687db41ba2fa1)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/orc/OrcDeserializer.scala          | 34 +++++++++++--------
 .../execution/datasources/ReadSchemaSuite.scala    |  6 ++++
 .../sql/execution/datasources/ReadSchemaTest.scala | 39 +++++++++++++++++++++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
index 4ecc54b..decd5c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala
@@ -37,28 +37,34 @@ class OrcDeserializer(
 
   private val resultRow = new SpecificInternalRow(requiredSchema.map(_.dataType))
 
+  // `fieldWriters(index)` is
+  // - null if the respective source column is missing, since the output value
+  //   is always null in this case
+  // - a function that updates target column `index` otherwise.
   private val fieldWriters: Array[WritableComparable[_] => Unit] = {
     requiredSchema.zipWithIndex
-      // The value of missing columns are always null, do not need writers.
-      .filterNot { case (_, index) => requestedColIds(index) == -1 }
       .map { case (f, index) =>
-        val writer = newWriter(f.dataType, new RowUpdater(resultRow))
-        (value: WritableComparable[_]) => writer(index, value)
+        if (requestedColIds(index) == -1) {
+          null
+        } else {
+          val writer = newWriter(f.dataType, new RowUpdater(resultRow))
+          (value: WritableComparable[_]) => writer(index, value)
+        }
       }.toArray
   }
 
-  private val validColIds = requestedColIds.filterNot(_ == -1)
-
   def deserialize(orcStruct: OrcStruct): InternalRow = {
-    var i = 0
-    while (i < validColIds.length) {
-      val value = orcStruct.getFieldValue(validColIds(i))
-      if (value == null) {
-        resultRow.setNullAt(i)
-      } else {
-        fieldWriters(i)(value)
+    var targetColumnIndex = 0
+    while (targetColumnIndex < fieldWriters.length) {
+      if (fieldWriters(targetColumnIndex) != null) {
+        val value = orcStruct.getFieldValue(requestedColIds(targetColumnIndex))
+        if (value == null) {
+          resultRow.setNullAt(targetColumnIndex)
+        } else {
+          fieldWriters(targetColumnIndex)(value)
+        }
       }
-      i += 1
+      targetColumnIndex += 1
     }
     resultRow
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
index 23c58e1..de234c1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaSuite.scala
@@ -72,6 +72,7 @@ class HeaderCSVReadSchemaSuite
 
 class JsonReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest
   with IntegralTypeTest
@@ -84,6 +85,7 @@ class JsonReadSchemaSuite
 
 class OrcReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
@@ -103,6 +105,7 @@ class OrcReadSchemaSuite
 
 class VectorizedOrcReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest
   with BooleanTypeTest
@@ -125,6 +128,7 @@ class VectorizedOrcReadSchemaSuite
 
 class ParquetReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
@@ -144,6 +148,7 @@ class ParquetReadSchemaSuite
 
 class VectorizedParquetReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
@@ -163,6 +168,7 @@ class VectorizedParquetReadSchemaSuite
 
 class MergedParquetReadSchemaSuite
   extends ReadSchemaSuite
+  with AddColumnIntoTheMiddleTest
   with HideColumnInTheMiddleTest
   with ChangePositionTest {
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
index 2a5457e..17d9d43 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/ReadSchemaTest.scala
@@ -69,7 +69,7 @@ trait ReadSchemaTest extends QueryTest with SQLTestUtils with SharedSQLContext {
 }
 
 /**
- * Add column (Case 1).
+ * Add column (Case 1-1).
  * This test suite assumes that the missing column should be `null`.
  */
 trait AddColumnTest extends ReadSchemaTest {
@@ -109,6 +109,43 @@ trait AddColumnTest extends ReadSchemaTest {
 }
 
 /**
+ * Add column into the middle (Case 1-2).
+ */
+trait AddColumnIntoTheMiddleTest extends ReadSchemaTest {
+  import testImplicits._
+
+  test("append column into middle") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df1 = Seq((1, 2, "abc"), (4, 5, "def"), (8, 9, null)).toDF("col1", "col2", "col3")
+      val df2 = Seq((10, null, 20, null), (40, "uvw", 50, "xyz"), (80, null, 90, null))
+        .toDF("col1", "col4", "col2", "col3")
+
+      val dir1 = s"$path${File.separator}part=one"
+      val dir2 = s"$path${File.separator}part=two"
+
+      df1.write.format(format).options(options).save(dir1)
+      df2.write.format(format).options(options).save(dir2)
+
+      val df = spark.read
+        .schema(df2.schema)
+        .format(format)
+        .options(options)
+        .load(path)
+
+      checkAnswer(df, Seq(
+        Row(1, null, 2, "abc", "one"),
+        Row(4, null, 5, "def", "one"),
+        Row(8, null, 9, null, "one"),
+        Row(10, null, 20, null, "two"),
+        Row(40, "uvw", 50, "xyz", "two"),
+        Row(80, null, 90, null, "two")))
+    }
+  }
+}
+
+/**
  * Hide column (Case 2-1).
  */
 trait HideColumnAtTheEndTest extends ReadSchemaTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org