You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/10/09 11:58:48 UTC

[spark] branch branch-2.4 updated: [SPARK-33101][ML][3.0] Make LibSVM format propagate Hadoop config from DS options to underlying HDFS file system

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 27b75af  [SPARK-33101][ML][3.0] Make LibSVM format propagate Hadoop config from DS options to underlying HDFS file system
27b75af is described below

commit 27b75afa7f4edf8672b92496626ffab8c78e25d4
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri Oct 9 20:49:10 2020 +0900

    [SPARK-33101][ML][3.0] Make LibSVM format propagate Hadoop config from DS options to underlying HDFS file system
    
    ### What changes were proposed in this pull request?
    Propagate LibSVM options to Hadoop configs in the LibSVM datasource.
    
    ### Why are the changes needed?
    There is a bug that when running:
    ```scala
    spark.read.format("libsvm").options(conf).load(path)
    ```
    The underlying file system will not receive the `conf` options.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes, for example, users should read files from Azure Data Lake successfully:
    ```scala
    def hadoopConf1() = Map[String, String](
      s"fs.adl.oauth2.access.token.provider.type" -> "ClientCredential",
      s"fs.adl.oauth2.client.id" -> dbutils.secrets.get(scope = "...", key = "..."),
      s"fs.adl.oauth2.credential" -> dbutils.secrets.get(scope = "...", key = "..."),
      s"fs.adl.oauth2.refresh.url" -> s"https://login.microsoftonline.com/.../oauth2/token")
    val df = spark.read.format("libsvm").options(hadoopConf1).load("adl://....azuredatalakestore.net/foldersp1/...")
    ```
    and not get the following exception because the settings above are not propagated to the filesystem:
    ```java
    java.lang.IllegalArgumentException: No value for fs.adl.oauth2.access.token.provider found in conf file.
    	at ....adl.AdlFileSystem.getNonEmptyVal(AdlFileSystem.java:820)
    	at ....adl.AdlFileSystem.getCustomAccessTokenProvider(AdlFileSystem.java:220)
    	at ....adl.AdlFileSystem.getAccessTokenProvider(AdlFileSystem.java:257)
    	at ....adl.AdlFileSystem.initialize(AdlFileSystem.java:164)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    ```
    
    ### How was this patch tested?
    Added UT to `LibSVMRelationSuite`.
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Dongjoon Hyun <dhyunapple.com>
    (cherry picked from commit 1234c66fa6b6d2c45edb40237788fa3bfdf96cf3)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #29986 from MaxGekk/ml-option-propagation-3.0.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit dcffa56f04a1764619a69789fbf7169ffea2bdd4)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/ml/source/libsvm/LibSVMRelation.scala |  2 +-
 .../main/scala/org/apache/spark/mllib/util/MLUtils.scala   |  6 ++++--
 .../spark/ml/source/libsvm/LibSVMRelationSuite.scala       | 14 ++++++++++++--
 3 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 5795812..298adf9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -100,7 +100,7 @@ private[libsvm] class LibSVMFileFormat
         "'numFeatures' option to avoid the extra scan.")
 
       val paths = files.map(_.getPath.toString)
-      val parsed = MLUtils.parseLibSVMFile(sparkSession, paths)
+      val parsed = MLUtils.parseLibSVMFile(sparkSession, paths, options)
       MLUtils.computeNumFeatures(parsed)
     }
 
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index c8550cd..d9bab3b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -105,13 +105,15 @@ object MLUtils extends Logging {
   }
 
   private[spark] def parseLibSVMFile(
-      sparkSession: SparkSession, paths: Seq[String]): RDD[(Double, Array[Int], Array[Double])] = {
+      sparkSession: SparkSession,
+      paths: Seq[String],
+      options: Map[String, String]): RDD[(Double, Array[Int], Array[Double])] = {
     val lines = sparkSession.baseRelationToDataFrame(
       DataSource.apply(
         sparkSession,
         paths = paths,
         className = classOf[TextFileFormat].getName,
-        options = Map(DataSource.GLOB_PATHS_KEY -> "false")
+        options = options ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
       ).resolveRelation(checkFilesExist = false))
       .select("value")
 
diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
index 28c770c..20a968c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala
@@ -27,12 +27,13 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
 import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SaveMode}
+import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 
-class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
+class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext with SQLHelper {
   // Path for dataset
   var path: String = _
 
@@ -202,4 +203,13 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
     val v = row1.getAs[SparseVector](1)
     assert(v == Vectors.sparse(2, Seq((0, 2.0), (1, 3.0))))
   }
+
+  test("SPARK-33101: should propagate Hadoop config from DS options to underlying file system") {
+    withSQLConf(
+      "fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
+      "fs.file.impl.disable.cache" -> "true") {
+      val df = spark.read.option("ds_option", "value").format("libsvm").load(path)
+      assert(df.columns(0) == "label")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org