You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/10 23:39:15 UTC

[spark] branch branch-3.4 updated: [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d79d10291c6 [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel
d79d10291c6 is described below

commit d79d10291c686377468d7f1bf46f866a243d5551
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Fri Mar 10 19:38:19 2023 -0400

    [SPARK-42667][CONNECT][FOLLOW-UP] SparkSession created by newSession should not share the channel
    
    ### What changes were proposed in this pull request?
    
    SparkSession created by newSession should not share the channel. This is because that a SparkSession might be called `stop` in which the channel it uses will be shutdown. If the channel is shared, other non-stop SparkSession that is sharing this channel will get into trouble.
    
    ### Why are the changes needed?
    
    This fixes the issue when one SparkSession is stopped to cause other active SparkSession not working in Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UT
    
    Closes #40346 from amaliujia/rw-session-do-not-share-channel.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit e5f56e51dcbffb1f79dc00e8493e946ce1209cdc)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../spark/sql/connect/client/SparkConnectClient.scala    | 16 +++++++++-------
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala   |  2 +-
 .../org/apache/spark/sql/PlanGenerationTestSuite.scala   |  2 +-
 .../org/apache/spark/sql/SQLImplicitsTestSuite.scala     |  2 +-
 4 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 348fc94bb89..736a8af8e38 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.connect.client
 
-import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, Metadata, MethodDescriptor, Status, TlsChannelCredentials}
+import io.grpc.{CallCredentials, CallOptions, Channel, ClientCall, ClientInterceptor, CompositeChannelCredentials, ForwardingClientCall, Grpc, InsecureChannelCredentials, ManagedChannel, ManagedChannelBuilder, Metadata, MethodDescriptor, Status, TlsChannelCredentials}
 import java.net.URI
 import java.util.UUID
 import java.util.concurrent.Executor
+import scala.language.existentials
 
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.UserContext
@@ -31,9 +32,11 @@ import org.apache.spark.sql.connect.common.config.ConnectCommon
  */
 private[sql] class SparkConnectClient(
     private val userContext: proto.UserContext,
-    private val channel: ManagedChannel,
+    private val channelBuilder: ManagedChannelBuilder[_],
     private[client] val userAgent: String) {
 
+  private[this] lazy val channel: ManagedChannel = channelBuilder.build()
+
   private[this] val stub = proto.SparkConnectServiceGrpc.newBlockingStub(channel)
 
   private[client] val artifactManager: ArtifactManager = new ArtifactManager(userContext, channel)
@@ -164,7 +167,7 @@ private[sql] class SparkConnectClient(
   }
 
   def copy(): SparkConnectClient = {
-    new SparkConnectClient(userContext, channel, userAgent)
+    new SparkConnectClient(userContext, channelBuilder, userAgent)
   }
 
   /**
@@ -208,8 +211,8 @@ private[sql] object SparkConnectClient {
       "Either remove 'token' or set 'use_ssl=true'"
 
   // for internal tests
-  def apply(userContext: UserContext, channel: ManagedChannel): SparkConnectClient =
-    new SparkConnectClient(userContext, channel, DEFAULT_USER_AGENT)
+  def apply(userContext: UserContext, builder: ManagedChannelBuilder[_]): SparkConnectClient =
+    new SparkConnectClient(userContext, builder, DEFAULT_USER_AGENT)
 
   def builder(): Builder = new Builder()
 
@@ -394,10 +397,9 @@ private[sql] object SparkConnectClient {
       if (metadata.nonEmpty) {
         channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
       }
-      val channel: ManagedChannel = channelBuilder.build()
       new SparkConnectClient(
         userContextBuilder.build(),
-        channel,
+        channelBuilder,
         userAgent.getOrElse(DEFAULT_USER_AGENT))
     }
   }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 43b0cd2674c..42376db880b 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -41,7 +41,7 @@ class DatasetSuite extends ConnectFunSuite with BeforeAndAfterEach {
   private def newSparkSession(): SparkSession = {
     val client = new SparkConnectClient(
       proto.UserContext.newBuilder().build(),
-      InProcessChannelBuilder.forName(getClass.getName).directExecutor().build(),
+      InProcessChannelBuilder.forName(getClass.getName).directExecutor(),
       "test")
     new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = new AtomicLong)
   }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 0d295d17296..3c7e1fdeee6 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -97,7 +97,7 @@ class PlanGenerationTestSuite
     super.beforeAll()
     val client = SparkConnectClient(
       proto.UserContext.newBuilder().build(),
-      InProcessChannelBuilder.forName("/dev/null").build())
+      InProcessChannelBuilder.forName("/dev/null"))
     session =
       new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = new AtomicLong)
   }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
index 3fcc135a22e..f3261ac4850 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLImplicitsTestSuite.scala
@@ -38,7 +38,7 @@ class SQLImplicitsTestSuite extends ConnectFunSuite with BeforeAndAfterAll {
     super.beforeAll()
     val client = SparkConnectClient(
       proto.UserContext.newBuilder().build(),
-      InProcessChannelBuilder.forName("/dev/null").build())
+      InProcessChannelBuilder.forName("/dev/null"))
     session =
       new SparkSession(client, cleaner = SparkSession.cleaner, planIdGenerator = new AtomicLong)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org