You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/27 13:05:27 UTC
[spark] branch branch-3.4 updated: [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 05fd99131de [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client
05fd99131de is described below
commit 05fd99131de01d2d346a2e4c48c4bae27af1f875
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Mon Feb 27 09:05:01 2023 -0400
[SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client
### What changes were proposed in this pull request?
This PR adds the RuntimeConfig class for the Spark Connect Scala Client.
### Why are the changes needed?
API Parity.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Added tests to the ClientE2ETestSuite.
Closes #40185 from hvanhovell/SPARK-42586.
Authored-by: Herman van Hovell <he...@databricks.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit a6f28ca7eab25d6cc1e6bcea1dedc70d36c30a61)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../scala/org/apache/spark/sql/RuntimeConfig.scala | 162 +++++++++++++++++++++
.../scala/org/apache/spark/sql/SparkSession.scala | 11 ++
.../sql/connect/client/SparkConnectClient.scala | 16 ++
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 19 +++
.../sql/connect/client/CompatibilitySuite.scala | 28 +---
5 files changed, 214 insertions(+), 22 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
new file mode 100644
index 00000000000..c16bc034488
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connect.client.SparkConnectClient
+
+/**
+ * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`.
+ *
+ * @since 3.4.0
+ */
+class RuntimeConfig(client: SparkConnectClient) extends Logging {
+
+ /**
+ * Sets the given Spark runtime configuration property.
+ *
+ * @since 3.4.0
+ */
+ def set(key: String, value: String): Unit = {
+ executeConfigRequest { builder =>
+ builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value)
+ }
+ }
+
+ /**
+ * Sets the given Spark runtime configuration property.
+ *
+ * @since 3.4.0
+ */
+ def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value))
+
+ /**
+ * Sets the given Spark runtime configuration property.
+ *
+ * @since 3.4.0
+ */
+ def set(key: String, value: Long): Unit = set(key, String.valueOf(value))
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given key.
+ *
+ * @throws java.util.NoSuchElementException
+ * if the key is not set and does not have a default value
+ * @since 3.4.0
+ */
+ @throws[NoSuchElementException]("if the key is not set")
+ def get(key: String): String = getOption(key).getOrElse {
+ throw new NoSuchElementException(key)
+ }
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given key.
+ *
+ * @since 3.4.0
+ */
+ def get(key: String, default: String): String = {
+ executeConfigRequestSingleValue { builder =>
+ builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default)
+ }
+ }
+
+ /**
+ * Returns all properties set in this conf.
+ *
+ * @since 3.4.0
+ */
+ def getAll: Map[String, String] = {
+ val response = executeConfigRequest { builder =>
+ builder.getGetAllBuilder
+ }
+ val builder = Map.newBuilder[String, String]
+ response.getPairsList.forEach { kv =>
+ require(kv.hasValue)
+ builder += ((kv.getKey, kv.getValue))
+ }
+ builder.result()
+ }
+
+ /**
+ * Returns the value of Spark runtime configuration property for the given key.
+ *
+ * @since 3.4.0
+ */
+ def getOption(key: String): Option[String] = {
+ val pair = executeConfigRequestSinglePair { builder =>
+ builder.getGetOptionBuilder.addKeys(key)
+ }
+ if (pair.hasValue) {
+ Option(pair.getValue)
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Resets the configuration property for the given key.
+ *
+ * @since 3.4.0
+ */
+ def unset(key: String): Unit = {
+ executeConfigRequest { builder =>
+ builder.getUnsetBuilder.addKeys(key)
+ }
+ }
+
+ /**
+ * Indicates whether the configuration property with the given key is modifiable in the current
+ * session.
+ *
+ * @return
+ * `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid
+ * (not existing) and other non-modifiable configuration properties, the returned value is
+ * `false`.
+ * @since 3.4.0
+ */
+ def isModifiable(key: String): Boolean = {
+ val modifiable = executeConfigRequestSingleValue { builder =>
+ builder.getIsModifiableBuilder.addKeys(key)
+ }
+ java.lang.Boolean.valueOf(modifiable)
+ }
+
+ private def executeConfigRequestSingleValue(
+ f: ConfigRequest.Operation.Builder => Unit): String = {
+ val pair = executeConfigRequestSinglePair(f)
+ require(pair.hasValue, "The returned pair does not have a value set")
+ pair.getValue
+ }
+
+ private def executeConfigRequestSinglePair(
+ f: ConfigRequest.Operation.Builder => Unit): KeyValue = {
+ val response = executeConfigRequest(f)
+ require(response.getPairsCount == 1, "")
+ response.getPairs(0)
+ }
+
+ private def executeConfigRequest(f: ConfigRequest.Operation.Builder => Unit): ConfigResponse = {
+ val builder = ConfigRequest.Operation.newBuilder()
+ f(builder)
+ val response = client.config(builder.build())
+ response.getWarningsList.forEach { warning =>
+ logWarning(warning)
+ }
+ response
+ }
+}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e39a6779e25..b1b1f4b0a4e 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -63,6 +63,17 @@ class SparkSession(
def version: String = SPARK_VERSION
+ /**
+ * Runtime configuration interface for Spark.
+ *
+ * This is the interface through which the user can get and set all Spark configurations that
+ * are relevant to Spark SQL. When getting the value of a config, his defaults to the value set
+ * in server, if any.
+ *
+ * @since 3.4.0
+ */
+ val conf: RuntimeConfig = new RuntimeConfig(client)
+
/**
* Executes some code block and prints to stdout the time taken to execute the block. This is
* available in Scala only and is used primarily for interactive testing and debugging.
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 12bb581880c..8b69f75b201 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -67,6 +67,22 @@ private[sql] class SparkConnectClient(
stub.executePlan(request)
}
+ /**
+ * Dispatch the [[proto.ConfigRequest]] to the Spark Connect server.
+ * @return
+ * A [[proto.ConfigResponse]] from the Spark Connect server.
+ */
+ def config(operation: proto.ConfigRequest.Operation): proto.ConfigResponse = {
+ val request = proto.ConfigRequest
+ .newBuilder()
+ .setOperation(operation)
+ .setClientId(sessionId)
+ .setClientType(userAgent)
+ .setUserContext(userContext)
+ .build()
+ stub.config(request)
+ }
+
/**
* Builds a [[proto.AnalyzePlanRequest]] from `plan` and dispatched it to the Spark Connect
* server.
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 33e9d0756c1..122e7d5d271 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -427,4 +427,23 @@ class ClientE2ETestSuite extends RemoteSparkSession {
val timeFragments = Seq("Time taken: ", " ms")
testCapturedStdOut(spark.time(spark.sql("select 1").collect()), timeFragments: _*)
}
+
+ test("RuntimeConfig") {
+ intercept[NoSuchElementException](spark.conf.get("foo.bar"))
+ assert(spark.conf.getOption("foo.bar").isEmpty)
+ spark.conf.set("foo.bar", value = true)
+ assert(spark.conf.getOption("foo.bar") === Option("true"))
+ spark.conf.set("foo.bar.numBaz", 100L)
+ assert(spark.conf.get("foo.bar.numBaz") === "100")
+ spark.conf.set("foo.bar.name", "donkey")
+ assert(spark.conf.get("foo.bar.name") === "donkey")
+ spark.conf.unset("foo.bar.name")
+ val allKeyValues = spark.conf.getAll
+ assert(allKeyValues("foo.bar") === "true")
+ assert(allKeyValues("foo.bar.numBaz") === "100")
+ assert(!spark.conf.isModifiable("foo.bar")) // This is a bit odd.
+ assert(spark.conf.isModifiable("spark.sql.ansi.enabled"))
+ assert(!spark.conf.isModifiable("spark.sql.globalTempDatabase"))
+ intercept[Exception](spark.conf.set("spark.sql.globalTempDatabase", "/dev/null"))
+ }
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 2c5ea027bb7..5546542898e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -80,7 +80,8 @@ class CompatibilitySuite extends ConnectFunSuite {
IncludeByName("org.apache.spark.sql.Dataset.*"),
IncludeByName("org.apache.spark.sql.functions.*"),
IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"),
- IncludeByName("org.apache.spark.sql.SparkSession.*"))
+ IncludeByName("org.apache.spark.sql.SparkSession.*"),
+ IncludeByName("org.apache.spark.sql.RuntimeConfig.*"))
val excludeRules = Seq(
// Filter unsupported rules:
// Note when muting errors for a method, checks on all overloading methods are also muted.
@@ -101,15 +102,11 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupBy"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.as"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.checkpoint"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.localCheckpoint"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.withWatermark"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.stat"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
@@ -125,24 +122,11 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.persist"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.cache"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.unpersist"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.Dataset.registerTempTable"
- ), // deprecated
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createTempView"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createOrReplaceTempView"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createGlobalTempView"),
- ProblemFilters.exclude[Problem](
- "org.apache.spark.sql.Dataset.createOrReplaceGlobalTempView"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJSON"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sameSemantics"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.semanticHash"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"),
// functions
@@ -170,11 +154,9 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.implicits"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.version"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.conf"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"),
@@ -189,7 +171,6 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.catalog"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.time"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.stop"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setActiveSession"),
@@ -198,7 +179,10 @@ class CompatibilitySuite extends ConnectFunSuite {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"),
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"),
- ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"))
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"),
+
+ // RuntimeConfig
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.RuntimeConfig.this"))
val problems = allProblems
.filter { p =>
includedRules.exists(rule => rule(p))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org