You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2020/06/09 19:16:52 UTC

[spark] branch master updated: [SPARK-31935][SQL] Hadoop file system config should be effective in data source options

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3771c6  [SPARK-31935][SQL] Hadoop file system config should be effective in data source options
f3771c6 is described below

commit f3771c6b47d0b3aef10b86586289a1f675c7cfe2
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Tue Jun 9 12:15:07 2020 -0700

    [SPARK-31935][SQL] Hadoop file system config should be effective in data source options
    
    ### What changes were proposed in this pull request?
    
    Mkae Hadoop file system config effective in data source options.
    
    From `org.apache.hadoop.fs.FileSystem.java`:
    ```
      public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();
        String authority = uri.getAuthority();
    
        if (scheme == null && authority == null) {     // use default FS
          return get(conf);
        }
    
        if (scheme != null && authority == null) {     // no authority
          URI defaultUri = getDefaultUri(conf);
          if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
              && defaultUri.getAuthority() != null) {  // & default has authority
            return get(defaultUri, conf);              // return default
          }
        }
    
        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
        if (conf.getBoolean(disableCacheName, false)) {
          return createFileSystem(uri, conf);
        }
    
        return CACHE.get(uri, conf);
      }
    ```
    Before changes, the file system configurations in data source options are not propagated in `DataSource.scala`.
    After changes, we can specify authority and URI schema related configurations for scanning file systems.
    
    This problem only exists in data source V1. In V2, we already use `sparkSession.sessionState.newHadoopConfWithOptions(options)` in `FileTable`.
    ### Why are the changes needed?
    
    Allow users to specify authority and URI schema related Hadoop configurations for file source reading.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the file system related Hadoop configuration in data source option will be effective on reading.
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #28760 from gengliangwang/ds_conf.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@databricks.com>
---
 .../spark/sql/execution/datasources/DataSource.scala | 13 +++++++------
 .../apache/spark/sql/FileBasedDataSourceSuite.scala  | 20 ++++++++++++++++++++
 .../sql/execution/datasources/DataSourceSuite.scala  | 14 +++++++++++++-
 .../spark/sql/streaming/FileStreamSourceSuite.scala  | 12 ++++++++++++
 4 files changed, 52 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 222fea6..07d7c4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -110,6 +110,9 @@ case class DataSource(
 
   private def providingInstance() = providingClass.getConstructor().newInstance()
 
+  private def newHadoopConfiguration(): Configuration =
+    sparkSession.sessionState.newHadoopConfWithOptions(options)
+
   lazy val sourceInfo: SourceInfo = sourceSchema()
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
@@ -231,7 +234,7 @@ case class DataSource(
         // once the streaming job starts and some upstream source starts dropping data.
         val hdfsPath = new Path(path)
         if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
-          val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+          val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
           if (!fs.exists(hdfsPath)) {
             throw new AnalysisException(s"Path does not exist: $path")
           }
@@ -358,7 +361,7 @@ case class DataSource(
       case (format: FileFormat, _)
           if FileStreamSink.hasMetadata(
             caseInsensitiveOptions.get("path").toSeq ++ paths,
-            sparkSession.sessionState.newHadoopConf(),
+            newHadoopConfiguration(),
             sparkSession.sessionState.conf) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
         val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
@@ -450,7 +453,7 @@ case class DataSource(
     val allPaths = paths ++ caseInsensitiveOptions.get("path")
     val outputPath = if (allPaths.length == 1) {
       val path = new Path(allPaths.head)
-      val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+      val fs = path.getFileSystem(newHadoopConfiguration())
       path.makeQualified(fs.getUri, fs.getWorkingDirectory)
     } else {
       throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
@@ -570,9 +573,7 @@ case class DataSource(
       checkEmptyGlobPath: Boolean,
       checkFilesExist: Boolean): Seq[Path] = {
     val allPaths = caseInsensitiveOptions.get("path") ++ paths
-    val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-    DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, hadoopConf,
+    DataSource.checkAndGlobPathIfNecessary(allPaths.toSeq, newHadoopConfiguration(),
       checkEmptyGlobPath, checkFilesExist)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index cb410b4..efc7cac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -843,6 +843,26 @@ class FileBasedDataSourceSuite extends QueryTest
     }
   }
 
+  test("SPARK-31935: Hadoop file system config should be effective in data source options") {
+    Seq("parquet", "").foreach { format =>
+      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
+        withTempDir { dir =>
+          val path = dir.getCanonicalPath
+          val defaultFs = "nonexistFS://nonexistFS"
+          val expectMessage = "No FileSystem for scheme: nonexistFS"
+          val message1 = intercept[java.io.IOException] {
+            spark.range(10).write.option("fs.defaultFS", defaultFs).parquet(path)
+          }.getMessage
+          assert(message1 == expectMessage)
+          val message2 = intercept[java.io.IOException] {
+            spark.read.option("fs.defaultFS", defaultFs).parquet(path)
+          }.getMessage
+          assert(message2 == expectMessage)
+        }
+      }
+    }
+  }
+
   test("SPARK-31116: Select nested schema with case insensitive mode") {
     // This test case failed at only Parquet. ORC is added for test coverage parity.
     Seq("orc", "parquet").foreach { format =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
index 1e3c660..9345158 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.test.SharedSparkSession
 
-class DataSourceSuite extends SharedSparkSession {
+class DataSourceSuite extends SharedSparkSession with PrivateMethodTester {
   import TestPaths._
 
   test("test glob and non glob paths") {
@@ -132,6 +133,17 @@ class DataSourceSuite extends SharedSparkSession {
       )
     )
   }
+
+  test("Data source options should be propagated in method checkAndGlobPathIfNecessary") {
+    val dataSourceOptions = Map("fs.defaultFS" -> "nonexistsFs://nonexistsFs")
+    val dataSource = DataSource(spark, "parquet", Seq("/path3"), options = dataSourceOptions)
+    val checkAndGlobPathIfNecessary = PrivateMethod[Seq[Path]]('checkAndGlobPathIfNecessary)
+
+    val message = intercept[java.io.IOException] {
+      dataSource invokePrivate checkAndGlobPathIfNecessary(false, false)
+    }.getMessage
+    assert(message.equals("No FileSystem for scheme: nonexistsFs"))
+  }
 }
 
 object TestPaths {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fa32033..32dceaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -532,6 +532,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  test("SPARK-31935: Hadoop file system config should be effective in data source options") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      val defaultFs = "nonexistFS://nonexistFS"
+      val expectMessage = "No FileSystem for scheme: nonexistFS"
+      val message = intercept[java.io.IOException] {
+        spark.readStream.option("fs.defaultFS", defaultFs).text(path)
+      }.getMessage
+      assert(message == expectMessage)
+    }
+  }
+
   test("read from textfile") {
     withTempDirs { case (src, tmp) =>
       val textStream = spark.readStream.textFile(src.getCanonicalPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org