You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/18 06:39:55 UTC
[spark] branch master updated: [SPARK-27490][SQL] File source V2:
return correct result for Dataset.inputFiles()
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7d44ba0 [SPARK-27490][SQL] File source V2: return correct result for Dataset.inputFiles()
7d44ba0 is described below
commit 7d44ba05d1d21d2488b075334569db47eacf424c
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Thu Apr 18 14:39:30 2019 +0800
[SPARK-27490][SQL] File source V2: return correct result for Dataset.inputFiles()
## What changes were proposed in this pull request?
Currently, a `Dateset` with file source V2 always return empty results for method `Dataset.inputFiles()`.
We should fix it.
## How was this patch tested?
Unit test
Closes #24393 from gengliangwang/inputFiles.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 3 ++
.../org/apache/spark/sql/DataFrameSuite.scala | 36 ++++++++++++----------
2 files changed, 23 insertions(+), 16 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 05015f8..793714f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.streaming.DataStreamWriter
@@ -3175,6 +3176,8 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
+ case DataSourceV2Relation(table: FileTable, _, _) =>
+ table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 78decd4..8a9c526 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -762,22 +762,26 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("inputFiles") {
- withTempDir { dir =>
- val df = Seq((1, 22)).toDF("a", "b")
-
- val parquetDir = new File(dir, "parquet").getCanonicalPath
- df.write.parquet(parquetDir)
- val parquetDF = spark.read.parquet(parquetDir)
- assert(parquetDF.inputFiles.nonEmpty)
-
- val jsonDir = new File(dir, "json").getCanonicalPath
- df.write.json(jsonDir)
- val jsonDF = spark.read.json(jsonDir)
- assert(parquetDF.inputFiles.nonEmpty)
-
- val unioned = jsonDF.union(parquetDF).inputFiles.sorted
- val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
- assert(unioned === allFiles)
+ Seq("csv", "").foreach { useV1List =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1List) {
+ withTempDir { dir =>
+ val df = Seq((1, 22)).toDF("a", "b")
+
+ val parquetDir = new File(dir, "parquet").getCanonicalPath
+ df.write.parquet(parquetDir)
+ val parquetDF = spark.read.parquet(parquetDir)
+ assert(parquetDF.inputFiles.nonEmpty)
+
+ val csvDir = new File(dir, "csv").getCanonicalPath
+ df.write.json(csvDir)
+ val csvDF = spark.read.json(csvDir)
+ assert(csvDF.inputFiles.nonEmpty)
+
+ val unioned = csvDF.union(parquetDF).inputFiles.sorted
+ val allFiles = (csvDF.inputFiles ++ parquetDF.inputFiles).distinct.sorted
+ assert(unioned === allFiles)
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org