You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/05/14 06:22:40 UTC
[spark] branch branch-3.0 updated: [SPARK-31692][SQL] Pass hadoop
confs specifed via Spark confs to URLStreamHandlerfactory
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d639a12 [SPARK-31692][SQL] Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory
d639a12 is described below
commit d639a12ef243e1e8d20bd06d3a97d00e47f05517
Author: Karuppayya Rajendran <ka...@gmail.com>
AuthorDate: Wed May 13 23:18:38 2020 -0700
[SPARK-31692][SQL] Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory
### What changes were proposed in this pull request?
Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory
### Why are the changes needed?
**BEFORE**
```
➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
scala> spark.sharedState
res0: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState5793cd84
scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
res1: java.io.InputStream = org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream22846025
scala> import org.apache.hadoop.fs._
import org.apache.hadoop.fs._
scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration)
res2: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem5a930c03
```
**AFTER**
```
➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
scala> spark.sharedState
res0: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState5c24a636
scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream2ba8f528
scala> import org.apache.hadoop.fs._
import org.apache.hadoop.fs._
scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration)
res2: org.apache.hadoop.fs.FileSystem = LocalFS
scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration).getClass
res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class org.apache.hadoop.fs.RawLocalFileSystem
```
The type of FileSystem object created(you can check the last statement in the above snippets) in the above two cases are different, which should not have been the case
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested locally.
Added Unit test
Closes #28516 from karuppayya/SPARK-31692.
Authored-by: Karuppayya Rajendran <ka...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 72601460ada41761737f39d5dff8e69444fce2ba)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/sql/internal/SharedState.scala | 6 +--
.../spark/sql/internal/SharedStateSuite.scala | 55 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 47119ab..ce4385d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -53,7 +53,7 @@ private[sql] class SharedState(
initialConfigs: scala.collection.Map[String, String])
extends Logging {
- SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
+ SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, sparkContext.hadoopConfiguration)
private val (conf, hadoopConf) = {
// Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
@@ -174,13 +174,13 @@ private[sql] class SharedState(
object SharedState extends Logging {
@volatile private var fsUrlStreamHandlerFactoryInitialized = false
- private def setFsUrlStreamHandlerFactory(conf: SparkConf): Unit = {
+ private def setFsUrlStreamHandlerFactory(conf: SparkConf, hadoopConf: Configuration): Unit = {
if (!fsUrlStreamHandlerFactoryInitialized &&
conf.get(DEFAULT_URL_STREAM_HANDLER_FACTORY_ENABLED)) {
synchronized {
if (!fsUrlStreamHandlerFactoryInitialized) {
try {
- URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
+ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf))
fsUrlStreamHandlerFactoryInitialized = true
} catch {
case NonFatal(_) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
new file mode 100644
index 0000000..81bf153
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import java.net.URL
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+
+/**
+ * Tests for [[org.apache.spark.sql.internal.SharedState]].
+ */
+class SharedStateSuite extends SharedSparkSession {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.hadoop.fs.defaultFS", "file:///")
+ }
+
+ test("SPARK-31692: Url handler factory should have the hadoop configs from Spark conf") {
+ // Accessing shared state to init the object since it is `lazy val`
+ spark.sharedState
+ val field = classOf[URL].getDeclaredField("factory")
+ field.setAccessible(true)
+ val value = field.get(null)
+ assert(value.isInstanceOf[FsUrlStreamHandlerFactory])
+ val streamFactory = value.asInstanceOf[FsUrlStreamHandlerFactory]
+
+ val confField = classOf[FsUrlStreamHandlerFactory].getDeclaredField("conf")
+ confField.setAccessible(true)
+ val conf = confField.get(streamFactory)
+
+ assert(conf.isInstanceOf[Configuration])
+ assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org