You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/19 01:04:24 UTC

[spark] branch master updated: [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 385f1c8e403 [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query
385f1c8e403 is described below

commit 385f1c8e4037928afafbf6664e30dc268510c05e
Author: Ala Luszczak <al...@databricks.com>
AuthorDate: Tue Jul 19 09:04:03 2022 +0800

    [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query
    
    ### What changes were proposed in this pull request?
    
    This changes alters the projection used in `FileScanRDD` to attach file metadata to a row produced by the reader. This
    projection used to remove the partitioning columns from the produced row. The produced row had different schema than expected by the consumers, and was missing part of the data, which resulted in query failure.
    
    ### Why are the changes needed?
    
    This is a bug. `FileScanRDD` should produce rows matching expected schema, and containing all the requested data. Queries should not crash due to internal errors.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Adds a new test in `FileMetadataStructSuite.scala` that reproduces the issue.
    
    Closes #37214 from ala/metadata-partition-by.
    
    Authored-by: Ala Luszczak <al...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  7 ++++--
 .../sql/execution/datasources/FileScanRDD.scala    |  4 ++--
 .../datasources/FileMetadataStructSuite.scala      | 26 ++++++++++++++++++++++
 3 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9e316cc88cf..5950136e79a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -650,7 +650,9 @@ case class FileSourceScanExec(
     }
 
     new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
-      requiredSchema, metadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+      new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns,
+      new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+
   }
 
   /**
@@ -707,7 +709,8 @@ case class FileSourceScanExec(
       FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
-      requiredSchema, metadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options)))
+      new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns,
+      new FileSourceOptions(CaseInsensitiveMap(relation.options)))
   }
 
   // Filters unused DynamicPruningExpression expressions - one which has been replaced
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 97776413509..4c3f5629e78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -69,7 +69,7 @@ class FileScanRDD(
     @transient private val sparkSession: SparkSession,
     readFunction: (PartitionedFile) => Iterator[InternalRow],
     @transient val filePartitions: Seq[FilePartition],
-    val readDataSchema: StructType,
+    val readSchema: StructType,
     val metadataColumns: Seq[AttributeReference] = Seq.empty,
     options: FileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(Map.empty)))
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
@@ -128,7 +128,7 @@ class FileScanRDD(
       // an unsafe projection to convert a joined internal row to an unsafe row
       private lazy val projection = {
         val joinedExpressions =
-          readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType)
+          readSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType)
         UnsafeProjection.create(joinedExpressions)
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 410fc985dd3..6afea42ee83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
 
+import org.apache.spark.TestUtils
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.functions._
@@ -30,6 +31,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructFiel
 
 class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
 
+  import testImplicits._
+
   val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom")))
 
   val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb")))
@@ -564,4 +567,27 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  Seq(true, false).foreach { useVectorizedReader =>
+    val label = if (useVectorizedReader) "reading batches" else "reading rows"
+    test(s"SPARK-39806: metadata for a partitioned table ($label)") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) {
+        withTempPath { dir =>
+          // Store dynamically partitioned data.
+          Seq(1 -> 1).toDF("a", "b").write.format("parquet").partitionBy("b")
+            .save(dir.getAbsolutePath)
+
+          // Identify the data file and its metadata.
+          val file = TestUtils.recursiveList(dir)
+            .filter(_.getName.endsWith(".parquet")).head
+          val expectedDf = Seq(1 -> 1).toDF("a", "b")
+            .withColumn(FileFormat.FILE_NAME, lit(file.getName))
+            .withColumn(FileFormat.FILE_SIZE, lit(file.length()))
+
+          checkAnswer(spark.read.parquet(dir.getAbsolutePath)
+            .select("*", METADATA_FILE_NAME, METADATA_FILE_SIZE), expectedDf)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org