You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2020/06/09 19:16:52 UTC
[spark] branch master updated: [SPARK-31935][SQL] Hadoop file
system config should be effective in data source options
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f3771c6 [SPARK-31935][SQL] Hadoop file system config should be effective in data source options
f3771c6 is described below
commit f3771c6b47d0b3aef10b86586289a1f675c7cfe2
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Tue Jun 9 12:15:07 2020 -0700
[SPARK-31935][SQL] Hadoop file system config should be effective in data source options
### What changes were proposed in this pull request?
Mkae Hadoop file system config effective in data source options.
From `org.apache.hadoop.fs.FileSystem.java`:
```
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
```
Before changes, the file system configurations in data source options are not propagated in `DataSource.scala`.
After changes, we can specify authority and URI schema related configurations for scanning file systems.
This problem only exists in data source V1. In V2, we already use `sparkSession.sessionState.newHadoopConfWithOptions(options)` in `FileTable`.
### Why are the changes needed?
Allow users to specify authority and URI schema related Hadoop configurations for file source reading.
### Does this PR introduce _any_ user-facing change?
Yes, the file system related Hadoop configuration in data source option will be effective on reading.
### How was this patch tested?
Unit test
Closes #28760 from gengliangwang/ds_conf.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@databricks.com>
---
.../spark/sql/execution/datasources/DataSource.scala | 13 +++++++------
.../apache/spark/sql/FileBasedDataSourceSuite.scala | 20 ++++++++++++++++++++
.../sql/execution/datasources/DataSourceSuite.scala | 14 +++++++++++++-
.../spark/sql/streaming/FileStreamSourceSuite.scala | 12 ++++++++++++
4 files changed, 52 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 222fea6..07d7c4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -110,6 +110,9 @@ case class DataSource(
private def providingInstance() = providingClass.getConstructor().newInstance()
+ private def newHadoopConfiguration(): Configuration =
+ sparkSession.sessionState.newHadoopConfWithOptions(options)
+
lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
@@ -231,7 +234,7 @@ case class DataSource(
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
- val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
}
@@ -358,7 +361,7 @@ case class DataSource(
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
- sparkSession.sessionState.newHadoopConf(),
+ newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
@@ -450,7 +453,7 @@ case class DataSource(
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
- val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
@@ -570,9 +573,7 @@ case class DataSource(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
- val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
- DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
+ DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
checkEmptyGlobPath, checkFilesExist)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index cb410b4..efc7cac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
+ test("SPARK-31935: Hadoop file system config should be effective in data source options") {
+ Seq("parquet", "").foreach { format =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val defaultFs = "nonexistFS://nonexistFS"
+ val expectMessage = "No FileSystem for scheme: nonexistFS"
+ val message1 = intercept[java.io.IOException] {
+ spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
+ }.getMessage
+ assert(message1 == expectMessage)
+ val message2 = intercept[java.io.IOException] {
+ spark.read.option("fs.defaultFS", defaultFs).parquet(path)
+ }.getMessage
+ assert(message2 == expectMessage)
+ }
+ }
+ }
+ }
+
test("SPARK-31116: Select nested schema with case insensitive mode") {
// This test case failed at only Parquet. ORC is added for test coverage parity.
Seq("orc", "parquet").foreach { format =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 1e3c660..9345158 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.scalatest.PrivateMethodTester
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.test.SharedSparkSession
-class DataSourceSuite extends SharedSparkSession {
+class DataSourceSuite extends SharedSparkSession with PrivateMethodTester {
import TestPaths._
test("test glob and non glob paths") {
@@ -132,6 +133,17 @@ class DataSourceSuite extends SharedSparkSession {
)
)
}
+
+ test("Data source options should be propagated in method checkAndGlobPathIfNecessary") {
+ val dataSourceOptions = Map("fs.defaultFS" -> "nonexistsFs://nonexistsFs")
+ val dataSource = DataSource(spark, "parquet", Seq("/path3"), options = dataSourceOptions)
+ val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary)
+
+ val message = intercept[java.io.IOException] {
+ dataSource invokePrivate checkAndGlobPathIfNecessary(false, false)
+ }.getMessage
+ assert(message.equals("No FileSystem for scheme: nonexistsFs"))
+ }
}
object TestPaths {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fa32033..32dceaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("SPARK-31935: Hadoop file system config should be effective in data source options") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ val defaultFs = "nonexistFS://nonexistFS"
+ val expectMessage = "No FileSystem for scheme: nonexistFS"
+ val message = intercept[java.io.IOException] {
+ spark.readStream.option("fs.defaultFS", defaultFs).text(path)
+ }.getMessage
+ assert(message == expectMessage)
+ }
+ }
+
test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org