You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/22 00:54:28 UTC

[spark] branch master updated: [SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77a8fb445cb [SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent
77a8fb445cb is described below

commit 77a8fb445cb099e11ed486447959b3de0a625b6d
Author: yaohua <ya...@databricks.com>
AuthorDate: Tue Nov 22 09:54:14 2022 +0900

    [SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent
    
    ### What changes were proposed in this pull request?
    In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L44 [...]
    
    This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
    1. By definition, `_metadata` for file-based sources is always not null;
    2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.
    
    ### Why are the changes needed?
    For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New UT
    
    Closes #38683 from Yaohua628/spark-41151.
    
    Authored-by: yaohua <ya...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/catalyst/expressions/namedExpressions.scala     |  5 +++--
 .../sql/execution/datasources/FileSourceStrategy.scala  |  6 +++++-
 .../execution/datasources/FileMetadataStructSuite.scala | 17 ++++++++++++++++-
 3 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 99e5f411bdb..8dd28e9aaae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -464,8 +464,9 @@ object FileSourceMetadataAttribute {
 
   val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
 
-  def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference =
-    AttributeReference(name, dataType, nullable,
+  def apply(name: String, dataType: DataType): AttributeReference =
+    // Metadata column for file sources is always not nullable.
+    AttributeReference(name, dataType, nullable = false,
       new MetadataBuilder()
         .putBoolean(METADATA_COL_ATTR_KEY, value = true)
         .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true).build())()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 576801d3dd5..476d6579b38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -275,8 +275,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
                 .get.withName(FileFormat.ROW_INDEX)
           }
         }
+        // SPARK-41151: metadata column is not nullable for file sources.
+        // Here, we *explicitly* enforce the not null to `CreateStruct(structColumns)`
+        // to avoid any risk of inconsistent schema nullability
         val metadataAlias =
-          Alias(CreateStruct(structColumns), METADATA_NAME)(exprId = metadataStruct.exprId)
+          Alias(KnownNotNull(CreateStruct(structColumns)),
+            METADATA_NAME)(exprId = metadataStruct.exprId)
         execution.ProjectExec(
           readDataColumns ++ partitionColumns :+ metadataAlias, scan)
       }.getOrElse(scan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index e0e208b62f1..a39a36a4f83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -600,7 +600,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       val df2 = spark.read.format("json")
         .load(dir.getCanonicalPath + "/target/new-streaming-data-join")
       // Verify self-join results
-      assert(streamQuery2.lastProgress.numInputRows == 4L)
+      assert(streamQuery2.lastProgress.numInputRows == 2L)
       assert(df2.count() == 2L)
       assert(df2.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
     }
@@ -654,4 +654,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
+
+  metadataColumnsTest("SPARK-41151: consistent _metadata nullability " +
+    "between analyzed and executed", schema) { (df, _, _) =>
+    val queryExecution = df.select("_metadata").queryExecution
+    val analyzedSchema = queryExecution.analyzed.schema
+    val executedSchema = queryExecution.executedPlan.schema
+    assert(analyzedSchema.fields.head.name == "_metadata")
+    assert(executedSchema.fields.head.name == "_metadata")
+    // For stateful streaming, we store the schema in the state store
+    // and check consistency across batches.
+    // To avoid state schema compatibility mismatched,
+    // we should keep nullability consistent for _metadata struct
+    assert(!analyzedSchema.fields.head.nullable)
+    assert(analyzedSchema.fields.head.nullable == executedSchema.fields.head.nullable)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org