You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/10/09 11:58:48 UTC
[spark] branch branch-2.4 updated: [SPARK-33101][ML][3.0] Make
LibSVM format propagate Hadoop config from DS options to underlying HDFS
file system
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 27b75af [SPARK-33101][ML][3.0] Make LibSVM format propagate Hadoop config from DS options to underlying HDFS file system
27b75af is described below
commit 27b75afa7f4edf8672b92496626ffab8c78e25d4
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri Oct 9 20:49:10 2020 +0900
[SPARK-33101][ML][3.0] Make LibSVM format propagate Hadoop config from DS options to underlying HDFS file system
### What changes were proposed in this pull request?
Propagate LibSVM options to Hadoop configs in the LibSVM datasource.
### Why are the changes needed?
There is a bug that when running:
```scala
spark.read.format("libsvm").options(conf).load(path)
```
The underlying file system will not receive the `conf` options.
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, for example, users should read files from Azure Data Lake successfully:
```scala
def hadoopConf1() = Map[String, String](
s"fs.adl.oauth2.access.token.provider.type" -> "ClientCredential",
s"fs.adl.oauth2.client.id" -> dbutils.secrets.get(scope = "...", key = "..."),
s"fs.adl.oauth2.credential" -> dbutils.secrets.get(scope = "...", key = "..."),
s"fs.adl.oauth2.refresh.url" -> s"https://login.microsoftonline.com/.../oauth2/token")
val df = spark.read.format("libsvm").options(hadoopConf1).load("adl://....azuredatalakestore.net/foldersp1/...")
```
and not get the following exception because the settings above are not propagated to the filesystem:
```java
java.lang.IllegalArgumentException: No value for fs.adl.oauth2.access.token.provider found in conf file.
at ....adl.AdlFileSystem.getNonEmptyVal(AdlFileSystem.java:820)
at ....adl.AdlFileSystem.getCustomAccessTokenProvider(AdlFileSystem.java:220)
at ....adl.AdlFileSystem.getAccessTokenProvider(AdlFileSystem.java:257)
at ....adl.AdlFileSystem.initialize(AdlFileSystem.java:164)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
```
### How was this patch tested?
Added UT to `LibSVMRelationSuite`.
Authored-by: Max Gekk <max.gekkgmail.com>
Signed-off-by: Dongjoon Hyun <dhyunapple.com>
(cherry picked from commit 1234c66fa6b6d2c45edb40237788fa3bfdf96cf3)
Signed-off-by: Max Gekk <max.gekkgmail.com>
Closes #29986 from MaxGekk/ml-option-propagation-3.0.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit dcffa56f04a1764619a69789fbf7169ffea2bdd4)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 2 +-
.../main/scala/org/apache/spark/mllib/util/MLUtils.scala | 6 ++++--
.../spark/ml/source/libsvm/LibSVMRelationSuite.scala | 14 ++++++++++++--
3 files changed, 17 insertions(+), 5 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 5795812..298adf9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -100,7 +100,7 @@ private[libsvm] class LibSVMFileFormat
"'numFeatures' option to avoid the extra scan.")
val paths = files.map(_.getPath.toString)
- val parsed = MLUtils.parseLibSVMFile(sparkSession, paths)
+ val parsed = MLUtils.parseLibSVMFile(sparkSession, paths, options)
MLUtils.computeNumFeatures(parsed)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index c8550cd..d9bab3b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -105,13 +105,15 @@ object MLUtils extends Logging {
}
private[spark] def parseLibSVMFile(
- sparkSession: SparkSession, paths: Seq[String]): RDD[(Double, Array[Int], Array[Double])] = {
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ options: Map[String, String]): RDD[(Double, Array[Int], Array[Double])] = {
val lines = sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
- options = Map(DataSource.GLOB_PATHS_KEY -> "false")
+ options = options ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
).resolveRelation(checkFilesExist = false))
.select("value")
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index 28c770c..20a968c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -27,12 +27,13 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SaveMode}
+import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.util.Utils
-class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
+class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext with SQLHelper {
// Path for dataset
var path: String = _
@@ -202,4 +203,13 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
val v = row1.getAs[SparseVector](1)
assert(v == Vectors.sparse(2, Seq((0, 2.0), (1, 3.0))))
}
+
+ test("SPARK-33101: should propagate Hadoop config from DS options to underlying file system") {
+ withSQLConf(
+ "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+ "fs.file.impl.disable.cache" -> "true") {
+ val df = spark.read.option("ds_option", "value").format("libsvm").load(path)
+ assert(df.columns(0) == "label")
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org