You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/19 01:08:00 UTC

[spark] branch branch-3.3 updated: [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new aeafb175875 [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query
aeafb175875 is described below

commit aeafb175875c00519e03e0ea5b5f22f765dc3607
Author: Ala Luszczak <al...@databricks.com>
AuthorDate: Tue Jul 19 09:04:03 2022 +0800

    [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query
    
    This changes alters the projection used in `FileScanRDD` to attach file metadata to a row produced by the reader. This
    projection used to remove the partitioning columns from the produced row. The produced row had different schema than expected by the consumers, and was missing part of the data, which resulted in query failure.
    
    This is a bug. `FileScanRDD` should produce rows matching expected schema, and containing all the requested data. Queries should not crash due to internal errors.
    
    No.
    
    Adds a new test in `FileMetadataStructSuite.scala` that reproduces the issue.
    
    Closes #37214 from ala/metadata-partition-by.
    
    Authored-by: Ala Luszczak <al...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 385f1c8e4037928afafbf6664e30dc268510c05e)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  4 ++--
 .../sql/execution/datasources/FileScanRDD.scala    |  4 ++--
 .../datasources/FileMetadataStructSuite.scala      | 26 ++++++++++++++++++++++
 3 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 9e8ae9a714d..40d29af28f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -621,7 +621,7 @@ case class FileSourceScanExec(
     }
 
     new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
-      requiredSchema, metadataColumns)
+      new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns)
   }
 
   /**
@@ -678,7 +678,7 @@ case class FileSourceScanExec(
       FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
-      requiredSchema, metadataColumns)
+      new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns)
   }
 
   // Filters unused DynamicPruningExpression expressions - one which has been replaced
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 20c393a5c0e..b65b36ef393 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -68,7 +68,7 @@ class FileScanRDD(
     @transient private val sparkSession: SparkSession,
     readFunction: (PartitionedFile) => Iterator[InternalRow],
     @transient val filePartitions: Seq[FilePartition],
-    val readDataSchema: StructType,
+    val readSchema: StructType,
     val metadataColumns: Seq[AttributeReference] = Seq.empty)
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
@@ -126,7 +126,7 @@ class FileScanRDD(
       // an unsafe projection to convert a joined internal row to an unsafe row
       private lazy val projection = {
         val joinedExpressions =
-          readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType)
+          readSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType)
         UnsafeProjection.create(joinedExpressions)
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 410fc985dd3..6afea42ee83 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
 
+import org.apache.spark.TestUtils
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.functions._
@@ -30,6 +31,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructFiel
 
 class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
 
+  import testImplicits._
+
   val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom")))
 
   val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb")))
@@ -564,4 +567,27 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       )
     }
   }
+
+  Seq(true, false).foreach { useVectorizedReader =>
+    val label = if (useVectorizedReader) "reading batches" else "reading rows"
+    test(s"SPARK-39806: metadata for a partitioned table ($label)") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) {
+        withTempPath { dir =>
+          // Store dynamically partitioned data.
+          Seq(1 -> 1).toDF("a", "b").write.format("parquet").partitionBy("b")
+            .save(dir.getAbsolutePath)
+
+          // Identify the data file and its metadata.
+          val file = TestUtils.recursiveList(dir)
+            .filter(_.getName.endsWith(".parquet")).head
+          val expectedDf = Seq(1 -> 1).toDF("a", "b")
+            .withColumn(FileFormat.FILE_NAME, lit(file.getName))
+            .withColumn(FileFormat.FILE_SIZE, lit(file.length()))
+
+          checkAnswer(spark.read.parquet(dir.getAbsolutePath)
+            .select("*", METADATA_FILE_NAME, METADATA_FILE_SIZE), expectedDf)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org