You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/04/27 03:46:00 UTC

[spark] branch branch-3.3 updated: [SPARK-34863][SQL][FOLLOW-UP] Handle IsAllNull in OffHeapColumnVector

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 80efaa2e979 [SPARK-34863][SQL][FOLLOW-UP] Handle IsAllNull in OffHeapColumnVector
80efaa2e979 is described below

commit 80efaa2e979276643df35b03f9c44c31340a62b3
Author: Ivan Sadikov <iv...@databricks.com>
AuthorDate: Tue Apr 26 20:44:50 2022 -0700

    [SPARK-34863][SQL][FOLLOW-UP] Handle IsAllNull in OffHeapColumnVector
    
    ### What changes were proposed in this pull request?
    
    This PR fixes an issue of reading null columns with the vectorised Parquet reader when the entire column is null or does not exist. This is especially noticeable when performing a merge or schema evolution in Parquet.
    
    The issue is only exposed with the `OffHeapColumnVector` which does not handle `isAllNull` flag - `OnHeapColumnVector` already handles `isAllNull` so everything works fine there.
    
    ### Why are the changes needed?
    
    The change is needed to correctly read null columns using the vectorised reader in the off-heap mode.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    I updated the existing unit tests to ensure we cover off-heap mode. I confirmed that the tests pass with the fix and fail without.
    
    Closes #36366 from sadikovi/fix-off-heap-cv.
    
    Authored-by: Ivan Sadikov <iv...@databricks.com>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../execution/vectorized/OffHeapColumnVector.java  |   2 +-
 .../datasources/parquet/ParquetIOSuite.scala       | 102 ++++++++++++---------
 .../parquet/ParquetInteroperabilitySuite.scala     |  80 ++++++++--------
 3 files changed, 100 insertions(+), 84 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 42552c7afc6..711c00856e9 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -132,7 +132,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
 
   @Override
   public boolean isNullAt(int rowId) {
-    return Platform.getByte(null, nulls + rowId) == 1;
+    return isAllNull || Platform.getByte(null, nulls + rowId) == 1;
   }
 
   //
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4d01db999fb..5cd1bffdb50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -376,23 +376,27 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("vectorized reader: missing array") {
-    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") {
-      val data = Seq(
-        Tuple1(null),
-        Tuple1(Seq()),
-        Tuple1(Seq("a", "b", "c")),
-        Tuple1(Seq(null))
-      )
-
-      val readSchema = new StructType().add("_2", new ArrayType(
-        new StructType().add("a", LongType, nullable = true),
-        containsNull = true)
-      )
+    Seq(true, false).foreach { offheapEnabled =>
+      withSQLConf(
+          SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
+          SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) {
+        val data = Seq(
+          Tuple1(null),
+          Tuple1(Seq()),
+          Tuple1(Seq("a", "b", "c")),
+          Tuple1(Seq(null))
+        )
 
-      withParquetFile(data) { file =>
-        checkAnswer(spark.read.schema(readSchema).parquet(file),
-          Row(null) :: Row(null) :: Row(null) :: Row(null) :: Nil
+        val readSchema = new StructType().add("_2", new ArrayType(
+          new StructType().add("a", LongType, nullable = true),
+          containsNull = true)
         )
+
+        withParquetFile(data) { file =>
+          checkAnswer(spark.read.schema(readSchema).parquet(file),
+            Row(null) :: Row(null) :: Row(null) :: Row(null) :: Nil
+          )
+        }
       }
     }
   }
@@ -666,45 +670,53 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
   }
 
   test("vectorized reader: missing all struct fields") {
-    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") {
-      val data = Seq(
-        Tuple1((1, "a")),
-        Tuple1((2, null)),
-        Tuple1(null)
-      )
+    Seq(true, false).foreach { offheapEnabled =>
+      withSQLConf(
+          SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
+          SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) {
+        val data = Seq(
+          Tuple1((1, "a")),
+          Tuple1((2, null)),
+          Tuple1(null)
+        )
 
-      val readSchema = new StructType().add("_1",
-        new StructType()
-            .add("_3", IntegerType, nullable = true)
-            .add("_4", LongType, nullable = true),
-        nullable = true)
+        val readSchema = new StructType().add("_1",
+          new StructType()
+              .add("_3", IntegerType, nullable = true)
+              .add("_4", LongType, nullable = true),
+          nullable = true)
 
-      withParquetFile(data) { file =>
-        checkAnswer(spark.read.schema(readSchema).parquet(file),
-          Row(null) :: Row(null) :: Row(null) :: Nil
-        )
+        withParquetFile(data) { file =>
+          checkAnswer(spark.read.schema(readSchema).parquet(file),
+            Row(null) :: Row(null) :: Row(null) :: Nil
+          )
+        }
       }
     }
   }
 
   test("vectorized reader: missing some struct fields") {
-    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") {
-      val data = Seq(
-        Tuple1((1, "a")),
-        Tuple1((2, null)),
-        Tuple1(null)
-      )
+    Seq(true, false).foreach { offheapEnabled =>
+      withSQLConf(
+          SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
+          SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) {
+        val data = Seq(
+          Tuple1((1, "a")),
+          Tuple1((2, null)),
+          Tuple1(null)
+        )
 
-      val readSchema = new StructType().add("_1",
-        new StructType()
-            .add("_1", IntegerType, nullable = true)
-            .add("_3", LongType, nullable = true),
-        nullable = true)
+        val readSchema = new StructType().add("_1",
+          new StructType()
+              .add("_1", IntegerType, nullable = true)
+              .add("_3", LongType, nullable = true),
+          nullable = true)
 
-      withParquetFile(data) { file =>
-        checkAnswer(spark.read.schema(readSchema).parquet(file),
-          Row(null) :: Row(Row(1, null)) :: Row(Row(2, null)) :: Nil
-        )
+        withParquetFile(data) { file =>
+          checkAnswer(spark.read.schema(readSchema).parquet(file),
+            Row(null) :: Row(Row(1, null)) :: Row(Row(2, null)) :: Nil
+          )
+        }
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index a7395a61992..8b386e8f689 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -102,47 +102,51 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
     // the data can be correctly read back.
 
     Seq(false, true).foreach { legacyMode =>
-      withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString) {
-        withTempPath { tableDir =>
-          val schema1 = StructType(
-            StructField("col-0", ArrayType(
-              StructType(
-                StructField("col-0", IntegerType, true) ::
-                Nil
-              ),
-              containsNull = false // allows to create 2-level Parquet LIST type in legacy mode
-            )) ::
-            Nil
-          )
-          val row1 = Row(Seq(Row(1)))
-          val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1)
-          df1.write.parquet(tableDir.getAbsolutePath)
+      Seq(false, true).foreach { offheapEnabled =>
+        withSQLConf(
+            SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString,
+            SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) {
+          withTempPath { tableDir =>
+            val schema1 = StructType(
+              StructField("col-0", ArrayType(
+                StructType(
+                  StructField("col-0", IntegerType, true) ::
+                  Nil
+                ),
+                containsNull = false // allows to create 2-level Parquet LIST type in legacy mode
+              )) ::
+              Nil
+            )
+            val row1 = Row(Seq(Row(1)))
+            val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1)
+            df1.write.parquet(tableDir.getAbsolutePath)
 
-          val schema2 = StructType(
-            StructField("col-0", ArrayType(
-              StructType(
-                StructField("col-0", IntegerType, true) ::
-                StructField("col-1", IntegerType, true) :: // additional field
-                Nil
-              ),
-              containsNull = false
-            )) ::
-            Nil
-          )
-          val row2 = Row(Seq(Row(1, 2)))
-          val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2)
-          df2.write.mode("append").parquet(tableDir.getAbsolutePath)
+            val schema2 = StructType(
+              StructField("col-0", ArrayType(
+                StructType(
+                  StructField("col-0", IntegerType, true) ::
+                  StructField("col-1", IntegerType, true) :: // additional field
+                  Nil
+                ),
+                containsNull = false
+              )) ::
+              Nil
+            )
+            val row2 = Row(Seq(Row(1, 2)))
+            val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2)
+            df2.write.mode("append").parquet(tableDir.getAbsolutePath)
 
-          // Reading of data should succeed and should not fail with
-          // java.lang.ClassCastException: optional int32 col-0 is not a group
-          withAllParquetReaders {
-            checkAnswer(
-              spark.read.schema(schema2).parquet(tableDir.getAbsolutePath),
-              Seq(
-                Row(Seq(Row(1, null))),
-                Row(Seq(Row(1, 2)))
+            // Reading of data should succeed and should not fail with
+            // java.lang.ClassCastException: optional int32 col-0 is not a group
+            withAllParquetReaders {
+              checkAnswer(
+                spark.read.schema(schema2).parquet(tableDir.getAbsolutePath),
+                Seq(
+                  Row(Seq(Row(1, null))),
+                  Row(Seq(Row(1, 2)))
+                )
               )
-            )
+            }
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org