You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/10/09 12:18:39 UTC
[spark] branch branch-3.0 updated: [SPARK-33094][SQL][3.0] Make ORC
format propagate Hadoop config from DS options to underlying HDFS file
system
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9892b3e [SPARK-33094][SQL][3.0] Make ORC format propagate Hadoop config from DS options to underlying HDFS file system
9892b3e is described below
commit 9892b3e452de490f7d2346bacf971ec13b92b219
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri Oct 9 21:14:26 2020 +0900
[SPARK-33094][SQL][3.0] Make ORC format propagate Hadoop config from DS options to underlying HDFS file system
### What changes were proposed in this pull request?
Propagate ORC options to Hadoop configs in Hive `OrcFileFormat` and in the regular ORC datasource.
### Why are the changes needed?
There is a bug that when running:
```scala
spark.read.format("orc").options(conf).load(path)
```
The underlying file system will not receive the conf options.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added UT to `OrcSourceSuite`.
Authored-by: Max Gekk <max.gekkgmail.com>
Signed-off-by: Dongjoon Hyun <dhyunapple.com>
(cherry picked from commit c5f6af9f17498bb0ec393c16616f2d99e5d3ee3d)
Signed-off-by: Max Gekk <max.gekkgmail.com>
Closes #29985 from MaxGekk/orc-option-propagation-3.0.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../spark/sql/execution/datasources/orc/OrcUtils.scala | 6 +++---
.../sql/execution/datasources/orc/OrcSourceSuite.scala | 17 ++++++++++++++++-
.../org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 2 +-
3 files changed, 20 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index be36432..c29287f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -81,10 +81,10 @@ object OrcUtils extends Logging {
}
}
- def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
+ def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
- val conf = sparkSession.sessionState.newHadoopConf()
+ val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
@@ -125,7 +125,7 @@ object OrcUtils extends Logging {
SchemaMergeUtils.mergeSchemasInParallel(
sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
} else {
- OrcUtils.readSchema(sparkSession, files)
+ OrcUtils.readSchema(sparkSession, files, options)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index b70fd74..5fa23a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -32,7 +32,7 @@ import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
-import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
+import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -538,6 +538,21 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}
+
+ test("SPARK-33094: should propagate Hadoop config from DS options to underlying file system") {
+ withSQLConf(
+ "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+ "fs.file.impl.disable.cache" -> "true") {
+ Seq(false, true).foreach { mergeSchema =>
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ val conf = Map("ds_option" -> "value", "mergeSchema" -> mergeSchema.toString)
+ spark.range(1).write.options(conf).orc(path)
+ checkAnswer(spark.read.options(conf).orc(path), Row(0))
+ }
+ }
+ }
+ }
}
class OrcSourceSuite extends OrcSuite with SharedSparkSession {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 356b92b..d1ee1ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -75,7 +75,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
- Some(sparkSession.sessionState.newHadoopConf()),
+ Some(sparkSession.sessionState.newHadoopConfWithOptions(options)),
ignoreCorruptFiles
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org