You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/20 14:34:13 UTC

[spark] branch branch-3.4 updated: [SPARK-42423][SQL] Add metadata column file block start and length

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 18d5d5e81ad [SPARK-42423][SQL] Add metadata column file block start and length
18d5d5e81ad is described below

commit 18d5d5e81adf70b94e968da6bd5bb783ff4ceb35
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Feb 20 22:33:36 2023 +0800

    [SPARK-42423][SQL] Add metadata column file block start and length
    
    ### What changes were proposed in this pull request?
    
    Support `_metadata.file_block_start` and `_metadata.file_block_length` for datasource file metadata columns.
    
    Note that, it does not support data filter since we only know block start and length after splitting files.
    
    ### Why are the changes needed?
    
    To improve the observability.
    
    Currently, we have an built-in function `InputFileBlockStart` which has some issues, e.g. not work for join. It's better to encourage people changing to use the metadata column.
    
    File block length is also an important information. People can find how Spark splits the big files.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes
    
    ### How was this patch tested?
    
    Improve exists test and add test
    
    Closes #39996 from ulysses-you/SPARK-42423.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit ae97131f1afa5deac2bd183872cedd8829024efa)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/datasources/FileFormat.scala     |  17 +++-
 .../sql/execution/datasources/FileScanRDD.scala    |  11 ++-
 .../execution/datasources/FileSourceStrategy.scala |   1 +
 .../datasources/PartitioningAwareFileIndex.scala   |   5 +-
 .../FileMetadataStructRowIndexSuite.scala          |   3 +-
 .../datasources/FileMetadataStructSuite.scala      | 108 ++++++++++++++++++---
 6 files changed, 125 insertions(+), 20 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 8811c1fd5f8..3d7e2c8bf3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -184,6 +184,10 @@ object FileFormat {
 
   val FILE_NAME = "file_name"
 
+  val FILE_BLOCK_START = "file_block_start"
+
+  val FILE_BLOCK_LENGTH = "file_block_length"
+
   val FILE_SIZE = "file_size"
 
   val FILE_MODIFICATION_TIME = "file_modification_time"
@@ -212,6 +216,8 @@ object FileFormat {
     .add(StructField(FileFormat.FILE_PATH, StringType, nullable = false))
     .add(StructField(FileFormat.FILE_NAME, StringType, nullable = false))
     .add(StructField(FileFormat.FILE_SIZE, LongType, nullable = false))
+    .add(StructField(FileFormat.FILE_BLOCK_START, LongType, nullable = false))
+    .add(StructField(FileFormat.FILE_BLOCK_LENGTH, LongType, nullable = false))
     .add(StructField(FileFormat.FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   /**
@@ -231,9 +237,12 @@ object FileFormat {
       fieldNames: Seq[String],
       filePath: Path,
       fileSize: Long,
-      fileModificationTime: Long): InternalRow =
+      fileModificationTime: Long): InternalRow = {
+    // We are not aware of `FILE_BLOCK_START` and `FILE_BLOCK_LENGTH` before splitting files
+    assert(!fieldNames.contains(FILE_BLOCK_START) && !fieldNames.contains(FILE_BLOCK_LENGTH))
     updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames,
-      filePath, fileSize, fileModificationTime)
+      filePath, fileSize, 0L, fileSize, fileModificationTime)
+  }
 
   // update an internal row given required metadata fields and file information
   def updateMetadataInternalRow(
@@ -241,12 +250,16 @@ object FileFormat {
       fieldNames: Seq[String],
       filePath: Path,
       fileSize: Long,
+      fileBlockStart: Long,
+      fileBlockLength: Long,
       fileModificationTime: Long): InternalRow = {
     fieldNames.zipWithIndex.foreach { case (name, i) =>
       name match {
         case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString))
         case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName))
         case FILE_SIZE => row.update(i, fileSize)
+        case FILE_BLOCK_START => row.update(i, fileBlockStart)
+        case FILE_BLOCK_LENGTH => row.update(i, fileBlockLength)
         case FILE_MODIFICATION_TIME =>
           // the modificationTime from the file is in millisecond,
           // while internally, the TimestampType `file_modification_time` is stored in microsecond
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 0ccf72823f1..7fb2d9c8ac7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -147,7 +147,8 @@ class FileScanRDD(
       private def updateMetadataRow(): Unit =
         if (metadataColumns.nonEmpty && currentFile != null) {
           updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
-            currentFile.toPath, currentFile.fileSize, currentFile.modificationTime)
+            currentFile.toPath, currentFile.fileSize, currentFile.start, currentFile.length,
+            currentFile.modificationTime)
         }
 
       /**
@@ -168,6 +169,14 @@ class FileScanRDD(
             val columnVector = new ConstantColumnVector(c.numRows(), LongType)
             columnVector.setLong(currentFile.fileSize)
             columnVector
+          case FILE_BLOCK_START =>
+            val columnVector = new ConstantColumnVector(c.numRows(), LongType)
+            columnVector.setLong(currentFile.start)
+            columnVector
+          case FILE_BLOCK_LENGTH =>
+            val columnVector = new ConstantColumnVector(c.numRows(), LongType)
+            columnVector.setLong(currentFile.length)
+            columnVector
           case FILE_MODIFICATION_TIME =>
             val columnVector = new ConstantColumnVector(c.numRows(), LongType)
             // the modificationTime from the file is in millisecond,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index a4ffcb5729d..f48e44d1aab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -313,6 +313,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
       val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
         val structColumns = metadataColumns.map { col => col.name match {
             case FileFormat.FILE_PATH | FileFormat.FILE_NAME | FileFormat.FILE_SIZE |
+                 FileFormat.FILE_BLOCK_START | FileFormat.FILE_BLOCK_LENGTH |
                  FileFormat.FILE_MODIFICATION_TIME =>
               col
             case FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 9578121a386..e81882ca6c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -77,7 +77,10 @@ abstract class PartitioningAwareFileIndex(
     // be applied to files.
     val fileMetadataFilterOpt = dataFilters.filter { f =>
       f.references.nonEmpty && f.references.forall {
-        case FileSourceConstantMetadataAttribute(_) => true
+        case FileSourceConstantMetadataAttribute(metadataAttr) =>
+          // we only know block start and length after splitting files, so skip it here
+          metadataAttr.name != FileFormat.FILE_BLOCK_START &&
+            metadataAttr.name != FileFormat.FILE_BLOCK_LENGTH
         case _ => false
       }
     }.reduceOption(expressions.And)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
index 61d81125e68..27a71791669 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructRowIndexSuite.scala
@@ -129,7 +129,8 @@ class FileMetadataStructRowIndexSuite extends QueryTest with SharedSparkSession
         errorClass = "FIELD_NOT_FOUND",
         parameters = Map(
           "fieldName" -> "`row_index`",
-          "fields" -> "`file_path`, `file_name`, `file_size`, `file_modification_time`"))
+          "fields" -> ("`file_path`, `file_name`, `file_size`, " +
+            "`file_block_start`, `file_block_length`, `file_modification_time`")))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index a85857a93a3..f22181160a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -58,6 +58,10 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
 
   private val METADATA_FILE_SIZE = "_metadata.file_size"
 
+  private val METADATA_FILE_BLOCK_START = "_metadata.file_block_start"
+
+  private val METADATA_FILE_BLOCK_LENGTH = "_metadata.file_block_length"
+
   private val METADATA_FILE_MODIFICATION_TIME = "_metadata.file_modification_time"
 
   private val METADATA_ROW_INDEX = "_metadata.row_index"
@@ -66,10 +70,12 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
 
   private def getMetadataRow(f: Map[String, Any]): Row = f(FILE_FORMAT) match {
     case "parquet" =>
-      Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE),
+      Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME),
+        f(METADATA_FILE_SIZE), f(METADATA_FILE_BLOCK_START), f(METADATA_FILE_BLOCK_LENGTH),
         f(METADATA_FILE_MODIFICATION_TIME), f(METADATA_ROW_INDEX))
     case _ =>
-      Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME), f(METADATA_FILE_SIZE),
+      Row(f(METADATA_FILE_PATH), f(METADATA_FILE_NAME),
+        f(METADATA_FILE_SIZE), f(METADATA_FILE_BLOCK_START), f(METADATA_FILE_BLOCK_LENGTH),
         f(METADATA_FILE_MODIFICATION_TIME))
   }
 
@@ -78,6 +84,10 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       METADATA_FILE_PATH -> f.toURI.toString,
       METADATA_FILE_NAME -> f.getName,
       METADATA_FILE_SIZE -> f.length(),
+      // test file is small enough so we would not do splitting files,
+      // then the file block start is always 0 and file block length is same with file size
+      METADATA_FILE_BLOCK_START -> 0,
+      METADATA_FILE_BLOCK_LENGTH -> f.length(),
       METADATA_FILE_MODIFICATION_TIME -> new Timestamp(f.lastModified()),
       METADATA_ROW_INDEX -> 0,
       FILE_FORMAT -> f.getName.split("\\.").last
@@ -137,14 +147,17 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
     checkAnswer(
       df.select("name", "age", "info",
         METADATA_FILE_NAME, METADATA_FILE_PATH,
-        METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME),
+        METADATA_FILE_SIZE, METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH,
+        METADATA_FILE_MODIFICATION_TIME),
       Seq(
         Row("jack", 24, Row(12345L, "uom"),
           f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH),
-          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)),
+          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_BLOCK_START), f0(METADATA_FILE_BLOCK_LENGTH),
+          f0(METADATA_FILE_MODIFICATION_TIME)),
         Row("lily", 31, Row(54321L, "ucb"),
           f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
-          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))
+          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_BLOCK_START), f1(METADATA_FILE_BLOCK_LENGTH),
+          f1(METADATA_FILE_MODIFICATION_TIME))
       )
     )
 
@@ -234,12 +247,15 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
   metadataColumnsTest("select only metadata", schema) { (df, f0, f1) =>
     checkAnswer(
       df.select(METADATA_FILE_NAME, METADATA_FILE_PATH,
-        METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME),
+        METADATA_FILE_SIZE, METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH,
+        METADATA_FILE_MODIFICATION_TIME),
       Seq(
         Row(f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH),
-          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)),
+          f0(METADATA_FILE_SIZE), f0(METADATA_FILE_BLOCK_START), f0(METADATA_FILE_BLOCK_LENGTH),
+          f0(METADATA_FILE_MODIFICATION_TIME)),
         Row(f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
-          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))
+          f1(METADATA_FILE_SIZE), f1(METADATA_FILE_BLOCK_START), f1(METADATA_FILE_BLOCK_LENGTH),
+          f1(METADATA_FILE_MODIFICATION_TIME))
       )
     )
     checkAnswer(
@@ -255,7 +271,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
     checkAnswer(
       df.select("name", "age", "info",
         METADATA_FILE_NAME, METADATA_FILE_PATH,
-        METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
+        METADATA_FILE_SIZE, METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH,
+        METADATA_FILE_MODIFICATION_TIME)
         .select("name", "file_path"), // cast _metadata.file_path as file_path
       Seq(
         Row("jack", f0(METADATA_FILE_PATH)),
@@ -332,7 +349,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
 
     val filteredDF = df.select("name", "age", "info",
       METADATA_FILE_NAME, METADATA_FILE_PATH,
-      METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
+      METADATA_FILE_SIZE, METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH,
+      METADATA_FILE_MODIFICATION_TIME)
       // mix metadata column + user column
       .where(Column(METADATA_FILE_NAME) === f1(METADATA_FILE_NAME) and Column("name") === "lily")
       // only metadata columns
@@ -354,7 +372,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       filteredDF,
       Seq(Row("lily", 31, Row(54321L, "ucb"),
         f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
-        f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+        f1(METADATA_FILE_SIZE), f1(METADATA_FILE_BLOCK_START), f1(METADATA_FILE_BLOCK_LENGTH),
+        f1(METADATA_FILE_MODIFICATION_TIME)))
     )
   }
 
@@ -420,12 +439,15 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
         checkAnswer(
           df.select("name", "age", "info",
             METADATA_FILE_NAME, METADATA_FILE_PATH,
-            METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME),
+            METADATA_FILE_SIZE, METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH,
+            METADATA_FILE_MODIFICATION_TIME),
           Seq(
             Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH),
-              f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)),
+              f0(METADATA_FILE_SIZE), f0(METADATA_FILE_BLOCK_START), f0(METADATA_FILE_BLOCK_LENGTH),
+              f0(METADATA_FILE_MODIFICATION_TIME)),
             Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
-              f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))
+              f1(METADATA_FILE_SIZE), f1(METADATA_FILE_BLOCK_START), f1(METADATA_FILE_BLOCK_LENGTH),
+              f1(METADATA_FILE_MODIFICATION_TIME))
           )
         )
 
@@ -561,6 +583,8 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
         METADATA_FILE_PATH -> sourceFile.toURI.toString,
         METADATA_FILE_NAME -> sourceFile.getName,
         METADATA_FILE_SIZE -> sourceFile.length(),
+        METADATA_FILE_BLOCK_START -> 0,
+        METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(),
         METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
       )
 
@@ -570,21 +594,26 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       checkAnswer(
         newDF.select(col("name"), col("age"), col("info"),
           col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
+          col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START), col(METADATA_FILE_BLOCK_LENGTH),
           // since we are writing _metadata to a json file,
           // we should explicitly cast the column to timestamp type
-          col(METADATA_FILE_SIZE), to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
+          to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
         Seq(
           Row(
             "jack", 24, Row(12345L, "uom"),
             sourceFileMetadata(METADATA_FILE_PATH),
             sourceFileMetadata(METADATA_FILE_NAME),
             sourceFileMetadata(METADATA_FILE_SIZE),
+            sourceFileMetadata(METADATA_FILE_BLOCK_START),
+            sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
             sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
           Row(
             "lily", 31, Row(54321L, "ucb"),
             sourceFileMetadata(METADATA_FILE_PATH),
             sourceFileMetadata(METADATA_FILE_NAME),
             sourceFileMetadata(METADATA_FILE_SIZE),
+            sourceFileMetadata(METADATA_FILE_BLOCK_START),
+            sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
             sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
         )
       )
@@ -799,4 +828,53 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-42423: Add metadata column file block start and length") {
+    withSQLConf(
+        SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1",
+        SQLConf.FILES_MAX_PARTITION_BYTES.key -> "1") {
+      withTempPath { path =>
+        spark.range(2).write.json(path.getCanonicalPath)
+        assert(path.listFiles().count(_.getName.endsWith("json")) == 1)
+
+        val df = spark.read.json(path.getCanonicalPath)
+          .select("id", METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH)
+        assert(df.rdd.partitions.length > 1)
+        val res = df.collect()
+        assert(res.length == 2)
+        assert(res.head.getLong(0) == 0) // id
+        assert(res.head.getLong(1) == 0) // file_block_start
+        assert(res.head.getLong(2) > 0) // file_block_length
+        assert(res(1).getLong(0) == 1L) // id
+        assert(res(1).getLong(1) > 0) // file_block_start
+        assert(res(1).getLong(2) > 0) // file_block_length
+
+        // make sure `_metadata.file_block_start` and `_metadata.file_block_length` does not affect
+        // pruning listed files
+        val df2 = spark.read.json(path.getCanonicalPath)
+          .where("_metadata.File_bLoCk_start > 0 and _metadata.file_block_length > 0 " +
+            "and _metadata.file_SizE > 0")
+          .select("id", METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH)
+        val fileSourceScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec])
+        assert(fileSourceScan2.isDefined)
+        val files2 = fileSourceScan2.get.asInstanceOf[FileSourceScanExec].selectedPartitions
+        assert(files2.length == 1 && files2.head.files.length == 1)
+        val res2 = df2.collect()
+        assert(res2.length == 1)
+        assert(res2.head.getLong(0) == 1L) // id
+        assert(res2.head.getLong(1) > 0) // file_block_start
+        assert(res2.head.getLong(2) > 0) // file_block_length
+
+        // make sure `_metadata.file_size > 1000000` still work for pruning listed files
+        val df3 = spark.read.json(path.getCanonicalPath)
+          .where("_metadata.File_bLoCk_start > 0 and _metadata.file_SizE > 1000000")
+          .select("id", METADATA_FILE_BLOCK_START, METADATA_FILE_BLOCK_LENGTH)
+        val fileSourceScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec])
+        assert(fileSourceScan3.isDefined)
+        val files3 = fileSourceScan3.get.asInstanceOf[FileSourceScanExec].selectedPartitions
+        assert(files3.length == 1 && files3.head.files.isEmpty)
+        assert(df3.collect().isEmpty)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org