You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2020/07/01 22:13:33 UTC
[spark] branch branch-3.0 updated: [SPARK-31935][SQL][FOLLOWUP]
Hadoop file system config should be effective in data source options
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fef3379 [SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
fef3379 is described below
commit fef337935179248e88aa4be84a4214fea1c3a11a
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jul 2 06:09:54 2020 +0800
[SPARK-31935][SQL][FOLLOWUP] Hadoop file system config should be effective in data source options
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/28760 to fix the remaining issues:
1. should consider data source options when refreshing cache by path at the end of `InsertIntoHadoopFsRelationCommand`
2. should consider data source options when inferring schema for file source
3. should consider data source options when getting the qualified path in file source v2.
### Why are the changes needed?
We didn't catch these issues in https://github.com/apache/spark/pull/28760, because the test case is to check error when initializing the file system. If we initialize the file system multiple times during a simple read/write action, the test case actually only test the first time.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
rewrite the test to make sure the entire data source read/write action can succeed.
Closes #28948 from cloud-fan/fix.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@databricks.com>
(cherry picked from commit 6edb20df834f7f9b85c1559ef78a3d0d2272e4df)
Signed-off-by: Gengliang Wang <ge...@databricks.com>
---
.../spark/sql/v2/avro/AvroDataSourceV2.scala | 4 +--
.../apache/spark/sql/execution/CacheManager.scala | 15 +++++++----
.../InsertIntoHadoopFsRelationCommand.scala | 2 +-
.../execution/datasources/SchemaMergeUtils.scala | 4 ++-
.../sql/execution/datasources/orc/OrcUtils.scala | 2 +-
.../datasources/parquet/ParquetFileFormat.scala | 3 ++-
.../datasources/parquet/ParquetUtils.scala | 2 +-
.../datasources/v2/FileDataSourceV2.scala | 13 ++++++---
.../datasources/v2/csv/CSVDataSourceV2.scala | 4 +--
.../datasources/v2/json/JsonDataSourceV2.scala | 4 +--
.../datasources/v2/orc/OrcDataSourceV2.scala | 4 +--
.../v2/parquet/ParquetDataSourceV2.scala | 4 +--
.../datasources/v2/text/TextDataSourceV2.scala | 4 +--
.../spark/sql/FileBasedDataSourceSuite.scala | 31 +++++++++++++---------
.../execution/datasources/orc/OrcSourceSuite.scala | 4 +--
.../apache/spark/sql/hive/orc/OrcFileFormat.scala | 4 +--
16 files changed, 59 insertions(+), 45 deletions(-)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
index c6f52d6..969dee0 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
@@ -31,13 +31,13 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 52cec8b..7d86c48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -248,12 +248,17 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
* `HadoopFsRelation` node(s) as part of its logical plan.
*/
def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
- val (fs, qualifiedPath) = {
- val path = new Path(resourcePath)
- val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
- (fs, fs.makeQualified(path))
- }
+ val path = new Path(resourcePath)
+ val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+ recacheByPath(spark, path, fs)
+ }
+ /**
+ * Tries to re-cache all the cache entries that contain `resourcePath` in one or more
+ * `HadoopFsRelation` node(s) as part of its logical plan.
+ */
+ def recacheByPath(spark: SparkSession, resourcePath: Path, fs: FileSystem): Unit = {
+ val qualifiedPath = fs.makeQualified(resourcePath)
recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index f119721..fe733f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -192,7 +192,7 @@ case class InsertIntoHadoopFsRelationCommand(
// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
- sparkSession.catalog.refreshByPath(outputPath.toString)
+ sparkSession.sharedState.cacheManager.recacheByPath(sparkSession, outputPath, fs)
if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
index 99882b0..28097c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala
@@ -32,10 +32,12 @@ object SchemaMergeUtils extends Logging {
*/
def mergeSchemasInParallel(
sparkSession: SparkSession,
+ parameters: Map[String, String],
files: Seq[FileStatus],
schemaReader: (Seq[FileStatus], Configuration, Boolean) => Seq[StructType])
: Option[StructType] = {
- val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())
+ val serializedConf = new SerializableConfiguration(
+ sparkSession.sessionState.newHadoopConfWithOptions(parameters))
// !! HACK ALERT !!
// Here is a hack for Parquet, but it can be used by Orc as well.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index eea9b2a..d274bcd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -109,7 +109,7 @@ object OrcUtils extends Logging {
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
SchemaMergeUtils.mergeSchemasInParallel(
- sparkSession, files, OrcUtils.readOrcSchemasInParallel)
+ sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
} else {
OrcUtils.readSchema(sparkSession, files)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 7187410..68f49f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -475,6 +475,7 @@ object ParquetFileFormat extends Logging {
* S3 nodes).
*/
def mergeSchemasInParallel(
+ parameters: Map[String, String],
filesToTouch: Seq[FileStatus],
sparkSession: SparkSession): Option[StructType] = {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
@@ -490,7 +491,7 @@ object ParquetFileFormat extends Logging {
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
}
- SchemaMergeUtils.mergeSchemasInParallel(sparkSession, filesToTouch, reader)
+ SchemaMergeUtils.mergeSchemasInParallel(sparkSession, parameters, filesToTouch, reader)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 7e7dba9..b91d75c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -104,7 +104,7 @@ object ParquetUtils {
.orElse(filesByType.data.headOption)
.toSeq
}
- ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
+ ParquetFileFormat.mergeSchemasInParallel(parameters, filesToTouch, sparkSession)
}
case class FileTypes(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 30a964d..bbe8835 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -18,7 +18,10 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util
+import scala.collection.JavaConverters._
+
import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
@@ -53,14 +56,16 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
paths ++ Option(map.get("path")).toSeq
}
- protected def getTableName(paths: Seq[String]): String = {
- val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
+ protected def getTableName(map: CaseInsensitiveStringMap, paths: Seq[String]): String = {
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(
+ map.asCaseSensitiveMap().asScala.toMap)
+ val name = shortName() + " " + paths.map(qualifiedPathName(_, hadoopConf)).mkString(",")
Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
}
- private def qualifiedPathName(path: String): String = {
+ private def qualifiedPathName(path: String, hadoopConf: Configuration): String = {
val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
index 1f99d42..69d001b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
@@ -31,13 +31,13 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
index 7a0949e..9c4e3b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
@@ -31,13 +31,13 @@ class JsonDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index 8665af3..fa2febd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -31,13 +31,13 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
index 8cb6186..7e7ca96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
@@ -31,13 +31,13 @@ class ParquetDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
index 049c717..43bcb61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
@@ -31,13 +31,13 @@ class TextDataSourceV2 extends FileDataSourceV2 {
override def getTable(options: CaseInsensitiveStringMap): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
}
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
val paths = getPaths(options)
- val tableName = getTableName(paths)
+ val tableName = getTableName(options, paths)
TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index d8157d3..231a8f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -18,12 +18,14 @@
package org.apache.spark.sql
import java.io.{File, FileNotFoundException}
+import java.net.URI
import java.nio.file.{Files, StandardOpenOption}
import java.util.Locale
import scala.collection.mutable
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
@@ -845,19 +847,15 @@ class FileBasedDataSourceSuite extends QueryTest
test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
- withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+ withSQLConf(
+ SQLConf.USE_V1_SOURCE_LIST.key -> format,
+ "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+ "fs.file.impl.disable.cache" -> "true") {
withTempDir { dir =>
- val path = dir.getCanonicalPath
- val defaultFs = "nonexistFS://nonexistFS"
- val expectMessage = "No FileSystem for scheme nonexistFS"
- val message1 = intercept[java.io.IOException] {
- spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
- }.getMessage
- assert(message1.filterNot(Set(':', '"').contains) == expectMessage)
- val message2 = intercept[java.io.IOException] {
- spark.read.option("fs.defaultFS", defaultFs).parquet(path)
- }.getMessage
- assert(message2.filterNot(Set(':', '"').contains) == expectMessage)
+ val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
+ spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
+ checkAnswer(
+ spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
}
}
}
@@ -932,3 +930,10 @@ object TestingUDT {
override def userClass: Class[NullData] = classOf[NullData]
}
}
+
+class FakeFileSystemRequiringDSOption extends LocalFileSystem {
+ override def initialize(name: URI, conf: Configuration): Unit = {
+ super.initialize(name, conf)
+ require(conf.get("ds_option", "") == "value")
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 7387368..b70fd74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -213,9 +213,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
val schema = SchemaMergeUtils.mergeSchemasInParallel(
- spark,
- fileStatuses,
- schemaReader)
+ spark, Map.empty, fileStatuses, schemaReader)
assert(schema.isDefined)
assert(schema.get == StructType(Seq(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 7f2eb14..356b92b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -70,9 +70,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
if (orcOptions.mergeSchema) {
SchemaMergeUtils.mergeSchemasInParallel(
- sparkSession,
- files,
- OrcFileOperator.readOrcSchemasInParallel)
+ sparkSession, options, files, OrcFileOperator.readOrcSchemasInParallel)
} else {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org