You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/11/22 00:54:28 UTC
[spark] branch master updated: [SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 77a8fb445cb [SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent
77a8fb445cb is described below
commit 77a8fb445cb099e11ed486447959b3de0a625b6d
Author: yaohua <ya...@databricks.com>
AuthorDate: Tue Nov 22 09:54:14 2022 +0900
[SPARK-41151][SQL] Keep built-in file `_metadata` column nullable value consistent
### What changes were proposed in this pull request?
In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L44 [...]
This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.
### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UT
Closes #38683 from Yaohua628/spark-41151.
Authored-by: yaohua <ya...@databricks.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../sql/catalyst/expressions/namedExpressions.scala | 5 +++--
.../sql/execution/datasources/FileSourceStrategy.scala | 6 +++++-
.../execution/datasources/FileMetadataStructSuite.scala | 17 ++++++++++++++++-
3 files changed, 24 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 99e5f411bdb..8dd28e9aaae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -464,8 +464,9 @@ object FileSourceMetadataAttribute {
val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
- def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference =
- AttributeReference(name, dataType, nullable,
+ def apply(name: String, dataType: DataType): AttributeReference =
+ // Metadata column for file sources is always not nullable.
+ AttributeReference(name, dataType, nullable = false,
new MetadataBuilder()
.putBoolean(METADATA_COL_ATTR_KEY, value = true)
.putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true).build())()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 576801d3dd5..476d6579b38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -275,8 +275,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
.get.withName(FileFormat.ROW_INDEX)
}
}
+ // SPARK-41151: metadata column is not nullable for file sources.
+ // Here, we *explicitly* enforce the not null to `CreateStruct(structColumns)`
+ // to avoid any risk of inconsistent schema nullability
val metadataAlias =
- Alias(CreateStruct(structColumns), METADATA_NAME)(exprId = metadataStruct.exprId)
+ Alias(KnownNotNull(CreateStruct(structColumns)),
+ METADATA_NAME)(exprId = metadataStruct.exprId)
execution.ProjectExec(
readDataColumns ++ partitionColumns :+ metadataAlias, scan)
}.getOrElse(scan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index e0e208b62f1..a39a36a4f83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -600,7 +600,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
val df2 = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data-join")
// Verify self-join results
- assert(streamQuery2.lastProgress.numInputRows == 4L)
+ assert(streamQuery2.lastProgress.numInputRows == 2L)
assert(df2.count() == 2L)
assert(df2.select("*").columns.toSet == Set("name", "age", "info", "_metadata"))
}
@@ -654,4 +654,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
}
}
}
+
+ metadataColumnsTest("SPARK-41151: consistent _metadata nullability " +
+ "between analyzed and executed", schema) { (df, _, _) =>
+ val queryExecution = df.select("_metadata").queryExecution
+ val analyzedSchema = queryExecution.analyzed.schema
+ val executedSchema = queryExecution.executedPlan.schema
+ assert(analyzedSchema.fields.head.name == "_metadata")
+ assert(executedSchema.fields.head.name == "_metadata")
+ // For stateful streaming, we store the schema in the state store
+ // and check consistency across batches.
+ // To avoid state schema compatibility mismatched,
+ // we should keep nullability consistent for _metadata struct
+ assert(!analyzedSchema.fields.head.nullable)
+ assert(analyzedSchema.fields.head.nullable == executedSchema.fields.head.nullable)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org