You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/10 23:39:15 UTC
[spark] branch branch-3.4 updated: [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new d79d10291c6 [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel
d79d10291c6 is described below
commit d79d10291c686377468d7f1bf46f866a243d5551
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Fri Mar 10 19:38:19 2023 -0400
[SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel
### What changes were proposed in this pull request?
SparkSession created by newSession should not share the channel. This is because that a SparkSession might be called `stop` in which the channel it uses will be shutdown. If the channel is shared, other non-stop SparkSession that is sharing this channel will get into trouble.
### Why are the changes needed?
This fixes the issue when one SparkSession is stopped to cause other active SparkSession not working in Spark Connect.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT
Closes #40346 from amaliujia/rw-session-do-not-share-channel.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit e5f56e51dcbffb1f79dc00e8493e946ce1209cdc)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../spark/sql/connect/client/SparkConnectClient.scala | 16 +++++++++-------
.../test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +-
.../org/apache/spark/sql/PlanGenerationTestSuite.scala | 2 +-
.../org/apache/spark/sql/SQLImplicitsTestSuite.scala | 2 +-
4 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 348fc94bb89..736a8af8e38 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.connect.client
-import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, TlsChannelCredentials}
+import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, ManagedChannelBuilder, Metadata, MethodDescriptor, Status, TlsChannelCredentials}
import java.net.URI
import java.util.UUID
import java.util.concurrent.Executor
+import scala.language.existentials
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.UserContext
@@ -31,9 +32,11 @@ import org.apache.spark.sql.connect.common.config.ConnectCommon
*/
private[sql] class SparkConnectClient(
private val userContext: proto.UserContext,
- private val channel: ManagedChannel,
+ private val channelBuilder: ManagedChannelBuilder[_],
private[client] val userAgent: String) {
+ private[this] lazy val channel: ManagedChannel = channelBuilder.build()
+
private[this] val stub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
private[client] val artifactManager: ArtifactManager = new ArtifactManager(userContext, channel)
@@ -164,7 +167,7 @@ private[sql] class SparkConnectClient(
}
def copy(): SparkConnectClient = {
- new SparkConnectClient(userContext, channel, userAgent)
+ new SparkConnectClient(userContext, channelBuilder, userAgent)
}
/**
@@ -208,8 +211,8 @@ private[sql] object SparkConnectClient {
"Either remove 'token' or set 'use_ssl=true'"
// for internal tests
- def apply(userContext: UserContext, channel: ManagedChannel): SparkConnectClient =
- new SparkConnectClient(userContext, channel, DEFAULT_USER_AGENT)
+ def apply(userContext: UserContext, builder: ManagedChannelBuilder[_]): SparkConnectClient =
+ new SparkConnectClient(userContext, builder, DEFAULT_USER_AGENT)
def builder(): Builder = new Builder()
@@ -394,10 +397,9 @@ private[sql] object SparkConnectClient {
if (metadata.nonEmpty) {
channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
}
- val channel: ManagedChannel = channelBuilder.build()
new SparkConnectClient(
userContextBuilder.build(),
- channel,
+ channelBuilder,
userAgent.getOrElse(DEFAULT_USER_AGENT))
}
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 43b0cd2674c..42376db880b 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -41,7 +41,7 @@ class DatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
private def newSparkSession(): SparkSession = {
val client = new SparkConnectClient(
proto.UserContext.newBuilder().build(),
- InProcessChannelBuilder.forName(getClass.getName).directExecutor().build(),
+ InProcessChannelBuilder.forName(getClass.getName).directExecutor(),
"test")
new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = new AtomicLong)
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 0d295d17296..3c7e1fdeee6 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -97,7 +97,7 @@ class PlanGenerationTestSuite
super.beforeAll()
val client = SparkConnectClient(
proto.UserContext.newBuilder().build(),
- InProcessChannelBuilder.forName("/dev/null").build())
+ InProcessChannelBuilder.forName("/dev/null"))
session =
new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = new AtomicLong)
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
index 3fcc135a22e..f3261ac4850 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
@@ -38,7 +38,7 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with BeforeAndAfterAll {
super.beforeAll()
val client = SparkConnectClient(
proto.UserContext.newBuilder().build(),
- InProcessChannelBuilder.forName("/dev/null").build())
+ InProcessChannelBuilder.forName("/dev/null"))
session =
new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = new AtomicLong)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org