You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/10/09 12:18:39 UTC

[spark] branch branch-3.0 updated: [SPARK-33094][SQL][3.0] Make ORC format propagate Hadoop config from DS options to underlying HDFS file system

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9892b3e  [SPARK-33094][SQL][3.0] Make ORC format propagate Hadoop config from DS options to underlying HDFS file system
9892b3e is described below

commit 9892b3e452de490f7d2346bacf971ec13b92b219
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri Oct 9 21:14:26 2020 +0900

    [SPARK-33094][SQL][3.0] Make ORC format propagate Hadoop config from DS options to underlying HDFS file system
    
    ### What changes were proposed in this pull request?
    Propagate ORC options to Hadoop configs in Hive `OrcFileFormat` and in the regular ORC datasource.
    
    ### Why are the changes needed?
    There is a bug that when running:
    ```scala
    spark.read.format("orc").options(conf).load(path)
    ```
    The underlying file system will not receive the conf options.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Added UT to `OrcSourceSuite`.
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Dongjoon Hyun <dhyunapple.com>
    (cherry picked from commit c5f6af9f17498bb0ec393c16616f2d99e5d3ee3d)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #29985 from MaxGekk/orc-option-propagation-3.0.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../spark/sql/execution/datasources/orc/OrcUtils.scala  |  6 +++---
 .../sql/execution/datasources/orc/OrcSourceSuite.scala  | 17 ++++++++++++++++-
 .../org/apache/spark/sql/hive/orc/OrcFileFormat.scala   |  2 +-
 3 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index be36432..c29287f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -81,10 +81,10 @@ object OrcUtils extends Logging {
     }
   }
 
-  def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
+  def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
       : Option[StructType] = {
     val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
-    val conf = sparkSession.sessionState.newHadoopConf()
+    val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
     files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
       case Some(schema) =>
         logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
@@ -125,7 +125,7 @@ object OrcUtils extends Logging {
       SchemaMergeUtils.mergeSchemasInParallel(
         sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
     } else {
-      OrcUtils.readSchema(sparkSession, files)
+      OrcUtils.readSchema(sparkSession, files, options)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index b70fd74..5fa23a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -32,7 +32,7 @@ import org.apache.orc.impl.RecordReaderImpl
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
-import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
+import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -538,6 +538,21 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
       }
     }
   }
+
+  test("SPARK-33094: should propagate Hadoop config from DS options to underlying file system") {
+    withSQLConf(
+      "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+      "fs.file.impl.disable.cache" -> "true") {
+      Seq(false, true).foreach { mergeSchema =>
+        withTempPath { dir =>
+          val path = dir.getAbsolutePath
+          val conf = Map("ds_option" -> "value", "mergeSchema" -> mergeSchema.toString)
+          spark.range(1).write.options(conf).orc(path)
+          checkAnswer(spark.read.options(conf).orc(path), Row(0))
+        }
+      }
+    }
+  }
 }
 
 class OrcSourceSuite extends OrcSuite with SharedSparkSession {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 356b92b..d1ee1ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -75,7 +75,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
       val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
       OrcFileOperator.readSchema(
         files.map(_.getPath.toString),
-        Some(sparkSession.sessionState.newHadoopConf()),
+        Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
         ignoreCorruptFiles
       )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org