You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/18 06:39:55 UTC

[spark] branch master updated: [SPARK-27490][SQL] File source V2: return correct result for Dataset.inputFiles()

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d44ba0  [SPARK-27490][SQL] File source V2: return correct result for Dataset.inputFiles()
7d44ba0 is described below

commit 7d44ba05d1d21d2488b075334569db47eacf424c
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Apr 18 14:39:30 2019 +0800

    [SPARK-27490][SQL] File source V2: return correct result for Dataset.inputFiles()
    
    ## What changes were proposed in this pull request?
    
    Currently, a `Dateset` with file source V2 always return empty results for method `Dataset.inputFiles()`.
    
    We should fix it.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24393 from gengliangwang/inputFiles.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  3 ++
 .../org/apache/spark/sql/DataFrameSuite.scala      | 36 ++++++++++++----------
 2 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 05015f8..793714f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.streaming.DataStreamWriter
@@ -3175,6 +3176,8 @@ class Dataset[T] private[sql](
         fr.inputFiles
       case r: HiveTableRelation =>
         r.tableMeta.storage.locationUri.map(_.toString).toArray
+      case DataSourceV2Relation(table: FileTable, _, _) =>
+        table.fileIndex.inputFiles
     }.flatten
     files.toSet.toArray
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 78decd4..8a9c526 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -762,22 +762,26 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("inputFiles") {
-    withTempDir { dir =>
-      val df = Seq((1, 22)).toDF("a", "b")
-
-      val parquetDir = new File(dir, "parquet").getCanonicalPath
-      df.write.parquet(parquetDir)
-      val parquetDF = spark.read.parquet(parquetDir)
-      assert(parquetDF.inputFiles.nonEmpty)
-
-      val jsonDir = new File(dir, "json").getCanonicalPath
-      df.write.json(jsonDir)
-      val jsonDF = spark.read.json(jsonDir)
-      assert(parquetDF.inputFiles.nonEmpty)
-
-      val unioned = jsonDF.union(parquetDF).inputFiles.sorted
-      val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
-      assert(unioned === allFiles)
+    Seq("csv", "").foreach { useV1List =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1List) {
+        withTempDir { dir =>
+          val df = Seq((1, 22)).toDF("a", "b")
+
+          val parquetDir = new File(dir, "parquet").getCanonicalPath
+          df.write.parquet(parquetDir)
+          val parquetDF = spark.read.parquet(parquetDir)
+          assert(parquetDF.inputFiles.nonEmpty)
+
+          val csvDir = new File(dir, "csv").getCanonicalPath
+          df.write.json(csvDir)
+          val csvDF = spark.read.json(csvDir)
+          assert(csvDF.inputFiles.nonEmpty)
+
+          val unioned = csvDF.union(parquetDF).inputFiles.sorted
+          val allFiles = (csvDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
+          assert(unioned === allFiles)
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org