You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/27 13:05:27 UTC

[spark] branch branch-3.4 updated: [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 05fd99131de [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client
05fd99131de is described below

commit 05fd99131de01d2d346a2e4c48c4bae27af1f875
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Mon Feb 27 09:05:01 2023 -0400

    [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client
    
    ### What changes were proposed in this pull request?
    This PR adds the RuntimeConfig class for the Spark Connect Scala Client.
    
    ### Why are the changes needed?
    API Parity.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    Added tests to the ClientE2ETestSuite.
    
    Closes #40185 from hvanhovell/SPARK-42586.
    
    Authored-by: Herman van Hovell <he...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit a6f28ca7eab25d6cc1e6bcea1dedc70d36c30a61)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../scala/org/apache/spark/sql/RuntimeConfig.scala | 162 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |  11 ++
 .../sql/connect/client/SparkConnectClient.scala    |  16 ++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  19 +++
 .../sql/connect/client/CompatibilitySuite.scala    |  28 +---
 5 files changed, 214 insertions(+), 22 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
new file mode 100644
index 00000000000..c16bc034488
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.client.SparkConnectClient
+
+/**
+ * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
+ *
+ * @since 3.4.0
+ */
+class RuntimeConfig(client: SparkConnectClient) extends Logging {
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 3.4.0
+   */
+  def set(key: String, value: String): Unit = {
+    executeConfigRequest { builder =>
+      builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value)
+    }
+  }
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 3.4.0
+   */
+  def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value))
+
+  /**
+   * Sets the given Spark runtime configuration property.
+   *
+   * @since 3.4.0
+   */
+  def set(key: String, value: Long): Unit = set(key, String.valueOf(value))
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given key.
+   *
+   * @throws java.util.NoSuchElementException
+   *   if the key is not set and does not have a default value
+   * @since 3.4.0
+   */
+  @throws[NoSuchElementException]("if the key is not set")
+  def get(key: String): String = getOption(key).getOrElse {
+    throw new NoSuchElementException(key)
+  }
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given key.
+   *
+   * @since 3.4.0
+   */
+  def get(key: String, default: String): String = {
+    executeConfigRequestSingleValue { builder =>
+      builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default)
+    }
+  }
+
+  /**
+   * Returns all properties set in this conf.
+   *
+   * @since 3.4.0
+   */
+  def getAll: Map[String, String] = {
+    val response = executeConfigRequest { builder =>
+      builder.getGetAllBuilder
+    }
+    val builder = Map.newBuilder[String, String]
+    response.getPairsList.forEach { kv =>
+      require(kv.hasValue)
+      builder += ((kv.getKey, kv.getValue))
+    }
+    builder.result()
+  }
+
+  /**
+   * Returns the value of Spark runtime configuration property for the given key.
+   *
+   * @since 3.4.0
+   */
+  def getOption(key: String): Option[String] = {
+    val pair = executeConfigRequestSinglePair { builder =>
+      builder.getGetOptionBuilder.addKeys(key)
+    }
+    if (pair.hasValue) {
+      Option(pair.getValue)
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Resets the configuration property for the given key.
+   *
+   * @since 3.4.0
+   */
+  def unset(key: String): Unit = {
+    executeConfigRequest { builder =>
+      builder.getUnsetBuilder.addKeys(key)
+    }
+  }
+
+  /**
+   * Indicates whether the configuration property with the given key is modifiable in the current
+   * session.
+   *
+   * @return
+   *   `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid
+   *   (not existing) and other non-modifiable configuration properties, the returned value is
+   *   `false`.
+   * @since 3.4.0
+   */
+  def isModifiable(key: String): Boolean = {
+    val modifiable = executeConfigRequestSingleValue { builder =>
+      builder.getIsModifiableBuilder.addKeys(key)
+    }
+    java.lang.Boolean.valueOf(modifiable)
+  }
+
+  private def executeConfigRequestSingleValue(
+      f: ConfigRequest.Operation.Builder => Unit): String = {
+    val pair = executeConfigRequestSinglePair(f)
+    require(pair.hasValue, "The returned pair does not have a value set")
+    pair.getValue
+  }
+
+  private def executeConfigRequestSinglePair(
+      f: ConfigRequest.Operation.Builder => Unit): KeyValue = {
+    val response = executeConfigRequest(f)
+    require(response.getPairsCount == 1, "")
+    response.getPairs(0)
+  }
+
+  private def executeConfigRequest(f: ConfigRequest.Operation.Builder => Unit): ConfigResponse = {
+    val builder = ConfigRequest.Operation.newBuilder()
+    f(builder)
+    val response = client.config(builder.build())
+    response.getWarningsList.forEach { warning =>
+      logWarning(warning)
+    }
+    response
+  }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e39a6779e25..b1b1f4b0a4e 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -63,6 +63,17 @@ class SparkSession(
 
   def version: String = SPARK_VERSION
 
+  /**
+   * Runtime configuration interface for Spark.
+   *
+   * This is the interface through which the user can get and set all Spark configurations that
+   * are relevant to Spark SQL. When getting the value of a config, his defaults to the value set
+   * in server, if any.
+   *
+   * @since 3.4.0
+   */
+  val conf: RuntimeConfig = new RuntimeConfig(client)
+
   /**
    * Executes some code block and prints to stdout the time taken to execute the block. This is
    * available in Scala only and is used primarily for interactive testing and debugging.
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 12bb581880c..8b69f75b201 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -67,6 +67,22 @@ private[sql] class SparkConnectClient(
     stub.executePlan(request)
   }
 
+  /**
+   * Dispatch the [[proto.ConfigRequest]] to the Spark Connect server.
+   * @return
+   *   A [[proto.ConfigResponse]] from the Spark Connect server.
+   */
+  def config(operation: proto.ConfigRequest.Operation): proto.ConfigResponse = {
+    val request = proto.ConfigRequest
+      .newBuilder()
+      .setOperation(operation)
+      .setClientId(sessionId)
+      .setClientType(userAgent)
+      .setUserContext(userContext)
+      .build()
+    stub.config(request)
+  }
+
   /**
    * Builds a [[proto.AnalyzePlanRequest]] from `plan` and dispatched it to the Spark Connect
    * server.
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 33e9d0756c1..122e7d5d271 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -427,4 +427,23 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     val timeFragments = Seq("Time taken: ", " ms")
     testCapturedStdOut(spark.time(spark.sql("select 1").collect()), timeFragments: _*)
   }
+
+  test("RuntimeConfig") {
+    intercept[NoSuchElementException](spark.conf.get("foo.bar"))
+    assert(spark.conf.getOption("foo.bar").isEmpty)
+    spark.conf.set("foo.bar", value = true)
+    assert(spark.conf.getOption("foo.bar") === Option("true"))
+    spark.conf.set("foo.bar.numBaz", 100L)
+    assert(spark.conf.get("foo.bar.numBaz") === "100")
+    spark.conf.set("foo.bar.name", "donkey")
+    assert(spark.conf.get("foo.bar.name") === "donkey")
+    spark.conf.unset("foo.bar.name")
+    val allKeyValues = spark.conf.getAll
+    assert(allKeyValues("foo.bar") === "true")
+    assert(allKeyValues("foo.bar.numBaz") === "100")
+    assert(!spark.conf.isModifiable("foo.bar")) // This is a bit odd.
+    assert(spark.conf.isModifiable("spark.sql.ansi.enabled"))
+    assert(!spark.conf.isModifiable("spark.sql.globalTempDatabase"))
+    intercept[Exception](spark.conf.set("spark.sql.globalTempDatabase", "/dev/null"))
+  }
 }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 2c5ea027bb7..5546542898e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -80,7 +80,8 @@ class CompatibilitySuite extends ConnectFunSuite {
       IncludeByName("org.apache.spark.sql.Dataset.*"),
       IncludeByName("org.apache.spark.sql.functions.*"),
       IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"),
-      IncludeByName("org.apache.spark.sql.SparkSession.*"))
+      IncludeByName("org.apache.spark.sql.SparkSession.*"),
+      IncludeByName("org.apache.spark.sql.RuntimeConfig.*"))
     val excludeRules = Seq(
       // Filter unsupported rules:
       // Note when muting errors for a method, checks on all overloading methods are also muted.
@@ -101,15 +102,11 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupBy"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.as"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.checkpoint"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.localCheckpoint"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.withWatermark"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.stat"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
@@ -125,24 +122,11 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.persist"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.cache"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.unpersist"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.Dataset.registerTempTable"
-      ), // deprecated
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createTempView"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createOrReplaceTempView"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createGlobalTempView"),
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.Dataset.createOrReplaceGlobalTempView"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJSON"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sameSemantics"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.semanticHash"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"),
 
       // functions
@@ -170,11 +154,9 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.implicits"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.version"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.conf"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"),
@@ -189,7 +171,6 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.catalog"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.time"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.stop"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setActiveSession"),
@@ -198,7 +179,10 @@ class CompatibilitySuite extends ConnectFunSuite {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"))
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"),
+
+      // RuntimeConfig
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.RuntimeConfig.this"))
     val problems = allProblems
       .filter { p =>
         includedRules.exists(rule => rule(p))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org