You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/29 16:25:30 UTC

[spark] branch master updated: [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c9734008401 [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID
c9734008401 is described below

commit c9734008401ce7adfb154cda5496c808b2d76580
Author: Raghu Angadi <ra...@databricks.com>
AuthorDate: Thu Jun 29 09:25:15 2023 -0700

    [SPARK-43474][SS][CONNECT] Add a spark connect access to runtime Dataframes by ID
    
    [This is a continuation of #41146, to change the author of the PR. Retains the description.]
    
    ### What changes were proposed in this pull request?
    
    This change adds a new spark connect relation type `CachedRemoteRelation`, which can represent a DataFrame that's been cached on the server side.
    
    On the server side, each `SessionHolder` has a cache to maintain mapping from Dataframe ID to actual dataframe.
    
    On the client side, a new relation type and function is added. The new function can create a DataFrame reference given a key. The key is the id of a cached DataFrame, which is usually passed from server to the client. When transforming the DataFrame reference, the server finds the actual DataFrame from the cache and replace it.
    
    One use case of this function will be streaming foreachBatch(). Server needs to call user function for every batch which takes a DataFrame as argument. With the new function, we can cache the DataFrame on the server. Pass the id back to client which can creates the DataFrame reference.
    
    ### Why are the changes needed?
    
    This change is needed to support streaming foreachBatch() in Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Scala unit test.
    Manual test.
    (More end to end test will be added when foreachBatch() is supported. Currently there is no way to add a dataframe to the server cache using Python.)
    
    Closes #41580 from rangadi/df-ref.
    
    Authored-by: Raghu Angadi <ra...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../main/protobuf/spark/connect/relations.proto    |   7 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |   8 +
 .../spark/sql/connect/service/SessionHolder.scala  |  32 +++
 .../service/SparkConnectSessionHodlerSuite.scala   |  82 +++++++
 python/pyspark/sql/connect/plan.py                 |  14 ++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 270 +++++++++++----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  28 +++
 7 files changed, 313 insertions(+), 128 deletions(-)

diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index d29ab02f86a..29405a1332b 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -70,6 +70,7 @@ message Relation {
     ApplyInPandasWithState apply_in_pandas_with_state = 34;
     HtmlString html_string = 35;
     CachedLocalRelation cached_local_relation = 36;
+    CachedRemoteRelation cached_remote_relation = 37;
 
     // NA functions
     NAFill fill_na = 90;
@@ -398,6 +399,12 @@ message CachedLocalRelation {
   string hash = 3;
 }
 
+// Represents a remote relation that has been cached on server.
+message CachedRemoteRelation {
+  // (Required) ID of the remote related (assigned by the service).
+  string relation_id = 1;
+}
+
 // Relation of type [[Sample]] that samples a fraction of the dataset.
 message Sample {
   // (Required) Input relation for a Sample.
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index cecf14a7045..cdad4fc6190 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -162,6 +162,8 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
         transformCoGroupMap(rel.getCoGroupMap)
       case proto.Relation.RelTypeCase.APPLY_IN_PANDAS_WITH_STATE =>
         transformApplyInPandasWithState(rel.getApplyInPandasWithState)
+      case proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION =>
+        transformCachedRemoteRelation(rel.getCachedRemoteRelation)
       case proto.Relation.RelTypeCase.COLLECT_METRICS =>
         transformCollectMetrics(rel.getCollectMetrics)
       case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse)
@@ -897,6 +899,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging {
       .logicalPlan
   }
 
+  private def transformCachedRemoteRelation(rel: proto.CachedRemoteRelation): LogicalPlan = {
+    sessionHolder
+      .getDataFrameOrThrow(rel.getRelationId)
+      .logicalPlan
+  }
+
   private def transformWithColumnsRenamed(rel: proto.WithColumnsRenamed): LogicalPlan = {
     Dataset
       .ofRows(session, transformRelation(rel.getInput))
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 00432209779..24502fccd96 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -28,10 +28,13 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.JobArtifactSet
+import org.apache.spark.SparkException
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
+import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.util.Utils
 
 /**
@@ -43,6 +46,10 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   val executePlanOperations: ConcurrentMap[String, ExecutePlanHolder] =
     new ConcurrentHashMap[String, ExecutePlanHolder]()
 
+  // Mapping from relation ID (passed to client) to runtime dataframe. Used for callbacks like
+  // foreachBatch() in Streaming. Lazy since most sessions don't need it.
+  private lazy val dataFrameCache: ConcurrentMap[String, DataFrame] = new ConcurrentHashMap()
+
   private[connect] def createExecutePlanHolder(
       request: proto.ExecutePlanRequest): ExecutePlanHolder = {
 
@@ -163,6 +170,31 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
       }
     }
   }
+
+  /**
+   * Caches given DataFrame with the ID. The cache does not expire. The entry needs to be
+   * explicitly removed by the owners of the DataFrame once it is not needed.
+   */
+  private[connect] def cacheDataFrameById(dfId: String, df: DataFrame): Unit = {
+    if (dataFrameCache.putIfAbsent(dfId, df) != null) {
+      SparkException.internalError(s"A dataframe is already associated with id $dfId")
+    }
+  }
+
+  /**
+   * Returns [[DataFrame]] cached for DataFrame ID `dfId`. If it is not found, throw
+   * [[InvalidPlanInput]].
+   */
+  private[connect] def getDataFrameOrThrow(dfId: String): DataFrame = {
+    Option(dataFrameCache.get(dfId))
+      .getOrElse {
+        throw InvalidPlanInput(s"No DataFrame with id $dfId is found in the session $sessionId")
+      }
+  }
+
+  private[connect] def removeCachedDataFrame(dfId: String): DataFrame = {
+    dataFrameCache.remove(dfId)
+  }
 }
 
 object SessionHolder {
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
new file mode 100644
index 00000000000..51b78886819
--- /dev/null
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import org.apache.spark.sql.connect.common.InvalidPlanInput
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SparkConnectSessionHolderSuite extends SharedSparkSession {
+
+  test("DataFrame cache: Successful put and get") {
+    val sessionHolder = SessionHolder.forTesting(spark)
+    import sessionHolder.session.implicits._
+
+    val data1 = Seq(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))
+    val df1 = data1.toDF()
+    val id1 = "df_id_1"
+    sessionHolder.cacheDataFrameById(id1, df1)
+
+    val expectedDf1 = sessionHolder.getDataFrameOrThrow(id1)
+    assert(expectedDf1 == df1)
+
+    val data2 = Seq(("k4", "v4"), ("k5", "v5"))
+    val df2 = data2.toDF()
+    val id2 = "df_id_2"
+    sessionHolder.cacheDataFrameById(id2, df2)
+
+    val expectedDf2 = sessionHolder.getDataFrameOrThrow(id2)
+    assert(expectedDf2 == df2)
+  }
+
+  test("DataFrame cache: Should throw when dataframe is not found") {
+    val sessionHolder = SessionHolder.forTesting(spark)
+    import sessionHolder.session.implicits._
+
+    val key1 = "key_1"
+
+    assertThrows[InvalidPlanInput] {
+      sessionHolder.getDataFrameOrThrow(key1)
+    }
+
+    val data1 = Seq(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))
+    val df1 = data1.toDF()
+    sessionHolder.cacheDataFrameById(key1, df1)
+    sessionHolder.getDataFrameOrThrow(key1)
+
+    val key2 = "key_2"
+    assertThrows[InvalidPlanInput] {
+      sessionHolder.getDataFrameOrThrow(key2)
+    }
+  }
+
+  test("DataFrame cache: Remove cache and then get should fail") {
+    val sessionHolder = SessionHolder.forTesting(spark)
+    import sessionHolder.session.implicits._
+
+    val key1 = "key_1"
+    val data1 = Seq(("k1", "v1"), ("k2", "v2"), ("k3", "v3"))
+    val df1 = data1.toDF()
+    sessionHolder.cacheDataFrameById(key1, df1)
+    sessionHolder.getDataFrameOrThrow(key1)
+
+    sessionHolder.removeCachedDataFrame(key1)
+    assertThrows[InvalidPlanInput] {
+      sessionHolder.getDataFrameOrThrow(key1)
+    }
+  }
+}
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index fabab98d9b2..97348d4863e 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -551,6 +551,20 @@ class WithWatermark(LogicalPlan):
         return plan
 
 
+class CachedRemoteRelation(LogicalPlan):
+    """Logical plan object for a DataFrame reference which represents a DataFrame that's been
+    cached on the server with a given id."""
+
+    def __init__(self, relationId: str):
+        super().__init__(None)
+        self._relationId = relationId
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = self._create_proto_relation()
+        plan.cached_remote_relation.relation_id = self._relationId
+        return plan
+
+
 class Hint(LogicalPlan):
     """Logical plan object for a Hint operation."""
 
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py b/python/pyspark/sql/connect/proto/relations_pb2.py
index d175dedc1a2..288bbe084c1 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as spark_dot_connect_dot_catal
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xf3\x16\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+    b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xd0\x17\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01 \x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02 \x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 \x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
 )
 
 
@@ -64,6 +64,7 @@ _DROP = DESCRIPTOR.message_types_by_name["Drop"]
 _DEDUPLICATE = DESCRIPTOR.message_types_by_name["Deduplicate"]
 _LOCALRELATION = DESCRIPTOR.message_types_by_name["LocalRelation"]
 _CACHEDLOCALRELATION = DESCRIPTOR.message_types_by_name["CachedLocalRelation"]
+_CACHEDREMOTERELATION = DESCRIPTOR.message_types_by_name["CachedRemoteRelation"]
 _SAMPLE = DESCRIPTOR.message_types_by_name["Sample"]
 _RANGE = DESCRIPTOR.message_types_by_name["Range"]
 _SUBQUERYALIAS = DESCRIPTOR.message_types_by_name["SubqueryAlias"]
@@ -364,6 +365,17 @@ CachedLocalRelation = _reflection.GeneratedProtocolMessageType(
 )
 _sym_db.RegisterMessage(CachedLocalRelation)
 
+CachedRemoteRelation = _reflection.GeneratedProtocolMessageType(
+    "CachedRemoteRelation",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _CACHEDREMOTERELATION,
+        "__module__": "spark.connect.relations_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.CachedRemoteRelation)
+    },
+)
+_sym_db.RegisterMessage(CachedRemoteRelation)
+
 Sample = _reflection.GeneratedProtocolMessageType(
     "Sample",
     (_message.Message,),
@@ -772,131 +784,133 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _PARSE_OPTIONSENTRY._options = None
     _PARSE_OPTIONSENTRY._serialized_options = b"8\001"
     _RELATION._serialized_start = 165
-    _RELATION._serialized_end = 3096
-    _UNKNOWN._serialized_start = 3098
-    _UNKNOWN._serialized_end = 3107
-    _RELATIONCOMMON._serialized_start = 3109
-    _RELATIONCOMMON._serialized_end = 3200
-    _SQL._serialized_start = 3203
-    _SQL._serialized_end = 3434
-    _SQL_ARGSENTRY._serialized_start = 3344
-    _SQL_ARGSENTRY._serialized_end = 3434
-    _READ._serialized_start = 3437
-    _READ._serialized_end = 4100
-    _READ_NAMEDTABLE._serialized_start = 3615
-    _READ_NAMEDTABLE._serialized_end = 3807
-    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 3749
-    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 3807
-    _READ_DATASOURCE._serialized_start = 3810
-    _READ_DATASOURCE._serialized_end = 4087
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3749
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3807
-    _PROJECT._serialized_start = 4102
-    _PROJECT._serialized_end = 4219
-    _FILTER._serialized_start = 4221
-    _FILTER._serialized_end = 4333
-    _JOIN._serialized_start = 4336
-    _JOIN._serialized_end = 4807
-    _JOIN_JOINTYPE._serialized_start = 4599
-    _JOIN_JOINTYPE._serialized_end = 4807
-    _SETOPERATION._serialized_start = 4810
-    _SETOPERATION._serialized_end = 5289
-    _SETOPERATION_SETOPTYPE._serialized_start = 5126
-    _SETOPERATION_SETOPTYPE._serialized_end = 5240
-    _LIMIT._serialized_start = 5291
-    _LIMIT._serialized_end = 5367
-    _OFFSET._serialized_start = 5369
-    _OFFSET._serialized_end = 5448
-    _TAIL._serialized_start = 5450
-    _TAIL._serialized_end = 5525
-    _AGGREGATE._serialized_start = 5528
-    _AGGREGATE._serialized_end = 6110
-    _AGGREGATE_PIVOT._serialized_start = 5867
-    _AGGREGATE_PIVOT._serialized_end = 5978
-    _AGGREGATE_GROUPTYPE._serialized_start = 5981
-    _AGGREGATE_GROUPTYPE._serialized_end = 6110
-    _SORT._serialized_start = 6113
-    _SORT._serialized_end = 6273
-    _DROP._serialized_start = 6276
-    _DROP._serialized_end = 6417
-    _DEDUPLICATE._serialized_start = 6420
-    _DEDUPLICATE._serialized_end = 6660
-    _LOCALRELATION._serialized_start = 6662
-    _LOCALRELATION._serialized_end = 6751
-    _CACHEDLOCALRELATION._serialized_start = 6753
-    _CACHEDLOCALRELATION._serialized_end = 6848
-    _SAMPLE._serialized_start = 6851
-    _SAMPLE._serialized_end = 7124
-    _RANGE._serialized_start = 7127
-    _RANGE._serialized_end = 7272
-    _SUBQUERYALIAS._serialized_start = 7274
-    _SUBQUERYALIAS._serialized_end = 7388
-    _REPARTITION._serialized_start = 7391
-    _REPARTITION._serialized_end = 7533
-    _SHOWSTRING._serialized_start = 7536
-    _SHOWSTRING._serialized_end = 7678
-    _HTMLSTRING._serialized_start = 7680
-    _HTMLSTRING._serialized_end = 7794
-    _STATSUMMARY._serialized_start = 7796
-    _STATSUMMARY._serialized_end = 7888
-    _STATDESCRIBE._serialized_start = 7890
-    _STATDESCRIBE._serialized_end = 7971
-    _STATCROSSTAB._serialized_start = 7973
-    _STATCROSSTAB._serialized_end = 8074
-    _STATCOV._serialized_start = 8076
-    _STATCOV._serialized_end = 8172
-    _STATCORR._serialized_start = 8175
-    _STATCORR._serialized_end = 8312
-    _STATAPPROXQUANTILE._serialized_start = 8315
-    _STATAPPROXQUANTILE._serialized_end = 8479
-    _STATFREQITEMS._serialized_start = 8481
-    _STATFREQITEMS._serialized_end = 8606
-    _STATSAMPLEBY._serialized_start = 8609
-    _STATSAMPLEBY._serialized_end = 8918
-    _STATSAMPLEBY_FRACTION._serialized_start = 8810
-    _STATSAMPLEBY_FRACTION._serialized_end = 8909
-    _NAFILL._serialized_start = 8921
-    _NAFILL._serialized_end = 9055
-    _NADROP._serialized_start = 9058
-    _NADROP._serialized_end = 9192
-    _NAREPLACE._serialized_start = 9195
-    _NAREPLACE._serialized_end = 9491
-    _NAREPLACE_REPLACEMENT._serialized_start = 9350
-    _NAREPLACE_REPLACEMENT._serialized_end = 9491
-    _TODF._serialized_start = 9493
-    _TODF._serialized_end = 9581
-    _WITHCOLUMNSRENAMED._serialized_start = 9584
-    _WITHCOLUMNSRENAMED._serialized_end = 9823
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 9756
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 9823
-    _WITHCOLUMNS._serialized_start = 9825
-    _WITHCOLUMNS._serialized_end = 9944
-    _WITHWATERMARK._serialized_start = 9947
-    _WITHWATERMARK._serialized_end = 10081
-    _HINT._serialized_start = 10084
-    _HINT._serialized_end = 10216
-    _UNPIVOT._serialized_start = 10219
-    _UNPIVOT._serialized_end = 10546
-    _UNPIVOT_VALUES._serialized_start = 10476
-    _UNPIVOT_VALUES._serialized_end = 10535
-    _TOSCHEMA._serialized_start = 10548
-    _TOSCHEMA._serialized_end = 10654
-    _REPARTITIONBYEXPRESSION._serialized_start = 10657
-    _REPARTITIONBYEXPRESSION._serialized_end = 10860
-    _MAPPARTITIONS._serialized_start = 10863
-    _MAPPARTITIONS._serialized_end = 11044
-    _GROUPMAP._serialized_start = 11047
-    _GROUPMAP._serialized_end = 11682
-    _COGROUPMAP._serialized_start = 11685
-    _COGROUPMAP._serialized_end = 12211
-    _APPLYINPANDASWITHSTATE._serialized_start = 12214
-    _APPLYINPANDASWITHSTATE._serialized_end = 12571
-    _COLLECTMETRICS._serialized_start = 12574
-    _COLLECTMETRICS._serialized_end = 12710
-    _PARSE._serialized_start = 12713
-    _PARSE._serialized_end = 13101
-    _PARSE_OPTIONSENTRY._serialized_start = 3749
-    _PARSE_OPTIONSENTRY._serialized_end = 3807
-    _PARSE_PARSEFORMAT._serialized_start = 13002
-    _PARSE_PARSEFORMAT._serialized_end = 13090
+    _RELATION._serialized_end = 3189
+    _UNKNOWN._serialized_start = 3191
+    _UNKNOWN._serialized_end = 3200
+    _RELATIONCOMMON._serialized_start = 3202
+    _RELATIONCOMMON._serialized_end = 3293
+    _SQL._serialized_start = 3296
+    _SQL._serialized_end = 3527
+    _SQL_ARGSENTRY._serialized_start = 3437
+    _SQL_ARGSENTRY._serialized_end = 3527
+    _READ._serialized_start = 3530
+    _READ._serialized_end = 4193
+    _READ_NAMEDTABLE._serialized_start = 3708
+    _READ_NAMEDTABLE._serialized_end = 3900
+    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 3842
+    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 3900
+    _READ_DATASOURCE._serialized_start = 3903
+    _READ_DATASOURCE._serialized_end = 4180
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3842
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3900
+    _PROJECT._serialized_start = 4195
+    _PROJECT._serialized_end = 4312
+    _FILTER._serialized_start = 4314
+    _FILTER._serialized_end = 4426
+    _JOIN._serialized_start = 4429
+    _JOIN._serialized_end = 4900
+    _JOIN_JOINTYPE._serialized_start = 4692
+    _JOIN_JOINTYPE._serialized_end = 4900
+    _SETOPERATION._serialized_start = 4903
+    _SETOPERATION._serialized_end = 5382
+    _SETOPERATION_SETOPTYPE._serialized_start = 5219
+    _SETOPERATION_SETOPTYPE._serialized_end = 5333
+    _LIMIT._serialized_start = 5384
+    _LIMIT._serialized_end = 5460
+    _OFFSET._serialized_start = 5462
+    _OFFSET._serialized_end = 5541
+    _TAIL._serialized_start = 5543
+    _TAIL._serialized_end = 5618
+    _AGGREGATE._serialized_start = 5621
+    _AGGREGATE._serialized_end = 6203
+    _AGGREGATE_PIVOT._serialized_start = 5960
+    _AGGREGATE_PIVOT._serialized_end = 6071
+    _AGGREGATE_GROUPTYPE._serialized_start = 6074
+    _AGGREGATE_GROUPTYPE._serialized_end = 6203
+    _SORT._serialized_start = 6206
+    _SORT._serialized_end = 6366
+    _DROP._serialized_start = 6369
+    _DROP._serialized_end = 6510
+    _DEDUPLICATE._serialized_start = 6513
+    _DEDUPLICATE._serialized_end = 6753
+    _LOCALRELATION._serialized_start = 6755
+    _LOCALRELATION._serialized_end = 6844
+    _CACHEDLOCALRELATION._serialized_start = 6846
+    _CACHEDLOCALRELATION._serialized_end = 6941
+    _CACHEDREMOTERELATION._serialized_start = 6943
+    _CACHEDREMOTERELATION._serialized_end = 6998
+    _SAMPLE._serialized_start = 7001
+    _SAMPLE._serialized_end = 7274
+    _RANGE._serialized_start = 7277
+    _RANGE._serialized_end = 7422
+    _SUBQUERYALIAS._serialized_start = 7424
+    _SUBQUERYALIAS._serialized_end = 7538
+    _REPARTITION._serialized_start = 7541
+    _REPARTITION._serialized_end = 7683
+    _SHOWSTRING._serialized_start = 7686
+    _SHOWSTRING._serialized_end = 7828
+    _HTMLSTRING._serialized_start = 7830
+    _HTMLSTRING._serialized_end = 7944
+    _STATSUMMARY._serialized_start = 7946
+    _STATSUMMARY._serialized_end = 8038
+    _STATDESCRIBE._serialized_start = 8040
+    _STATDESCRIBE._serialized_end = 8121
+    _STATCROSSTAB._serialized_start = 8123
+    _STATCROSSTAB._serialized_end = 8224
+    _STATCOV._serialized_start = 8226
+    _STATCOV._serialized_end = 8322
+    _STATCORR._serialized_start = 8325
+    _STATCORR._serialized_end = 8462
+    _STATAPPROXQUANTILE._serialized_start = 8465
+    _STATAPPROXQUANTILE._serialized_end = 8629
+    _STATFREQITEMS._serialized_start = 8631
+    _STATFREQITEMS._serialized_end = 8756
+    _STATSAMPLEBY._serialized_start = 8759
+    _STATSAMPLEBY._serialized_end = 9068
+    _STATSAMPLEBY_FRACTION._serialized_start = 8960
+    _STATSAMPLEBY_FRACTION._serialized_end = 9059
+    _NAFILL._serialized_start = 9071
+    _NAFILL._serialized_end = 9205
+    _NADROP._serialized_start = 9208
+    _NADROP._serialized_end = 9342
+    _NAREPLACE._serialized_start = 9345
+    _NAREPLACE._serialized_end = 9641
+    _NAREPLACE_REPLACEMENT._serialized_start = 9500
+    _NAREPLACE_REPLACEMENT._serialized_end = 9641
+    _TODF._serialized_start = 9643
+    _TODF._serialized_end = 9731
+    _WITHCOLUMNSRENAMED._serialized_start = 9734
+    _WITHCOLUMNSRENAMED._serialized_end = 9973
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 9906
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 9973
+    _WITHCOLUMNS._serialized_start = 9975
+    _WITHCOLUMNS._serialized_end = 10094
+    _WITHWATERMARK._serialized_start = 10097
+    _WITHWATERMARK._serialized_end = 10231
+    _HINT._serialized_start = 10234
+    _HINT._serialized_end = 10366
+    _UNPIVOT._serialized_start = 10369
+    _UNPIVOT._serialized_end = 10696
+    _UNPIVOT_VALUES._serialized_start = 10626
+    _UNPIVOT_VALUES._serialized_end = 10685
+    _TOSCHEMA._serialized_start = 10698
+    _TOSCHEMA._serialized_end = 10804
+    _REPARTITIONBYEXPRESSION._serialized_start = 10807
+    _REPARTITIONBYEXPRESSION._serialized_end = 11010
+    _MAPPARTITIONS._serialized_start = 11013
+    _MAPPARTITIONS._serialized_end = 11194
+    _GROUPMAP._serialized_start = 11197
+    _GROUPMAP._serialized_end = 11832
+    _COGROUPMAP._serialized_start = 11835
+    _COGROUPMAP._serialized_end = 12361
+    _APPLYINPANDASWITHSTATE._serialized_start = 12364
+    _APPLYINPANDASWITHSTATE._serialized_end = 12721
+    _COLLECTMETRICS._serialized_start = 12724
+    _COLLECTMETRICS._serialized_end = 12860
+    _PARSE._serialized_start = 12863
+    _PARSE._serialized_end = 13251
+    _PARSE_OPTIONSENTRY._serialized_start = 3842
+    _PARSE_OPTIONSENTRY._serialized_end = 3900
+    _PARSE_PARSEFORMAT._serialized_start = 13152
+    _PARSE_PARSEFORMAT._serialized_end = 13240
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index 28609b41713..8909d438c9d 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -98,6 +98,7 @@ class Relation(google.protobuf.message.Message):
     APPLY_IN_PANDAS_WITH_STATE_FIELD_NUMBER: builtins.int
     HTML_STRING_FIELD_NUMBER: builtins.int
     CACHED_LOCAL_RELATION_FIELD_NUMBER: builtins.int
+    CACHED_REMOTE_RELATION_FIELD_NUMBER: builtins.int
     FILL_NA_FIELD_NUMBER: builtins.int
     DROP_NA_FIELD_NUMBER: builtins.int
     REPLACE_FIELD_NUMBER: builtins.int
@@ -185,6 +186,8 @@ class Relation(google.protobuf.message.Message):
     @property
     def cached_local_relation(self) -> global___CachedLocalRelation: ...
     @property
+    def cached_remote_relation(self) -> global___CachedRemoteRelation: ...
+    @property
     def fill_na(self) -> global___NAFill:
         """NA functions"""
     @property
@@ -257,6 +260,7 @@ class Relation(google.protobuf.message.Message):
         apply_in_pandas_with_state: global___ApplyInPandasWithState | None = ...,
         html_string: global___HtmlString | None = ...,
         cached_local_relation: global___CachedLocalRelation | None = ...,
+        cached_remote_relation: global___CachedRemoteRelation | None = ...,
         fill_na: global___NAFill | None = ...,
         drop_na: global___NADrop | None = ...,
         replace: global___NAReplace | None = ...,
@@ -283,6 +287,8 @@ class Relation(google.protobuf.message.Message):
             b"approx_quantile",
             "cached_local_relation",
             b"cached_local_relation",
+            "cached_remote_relation",
+            b"cached_remote_relation",
             "catalog",
             b"catalog",
             "co_group_map",
@@ -390,6 +396,8 @@ class Relation(google.protobuf.message.Message):
             b"approx_quantile",
             "cached_local_relation",
             b"cached_local_relation",
+            "cached_remote_relation",
+            b"cached_remote_relation",
             "catalog",
             b"catalog",
             "co_group_map",
@@ -524,6 +532,7 @@ class Relation(google.protobuf.message.Message):
         "apply_in_pandas_with_state",
         "html_string",
         "cached_local_relation",
+        "cached_remote_relation",
         "fill_na",
         "drop_na",
         "replace",
@@ -1608,6 +1617,25 @@ class CachedLocalRelation(google.protobuf.message.Message):
 
 global___CachedLocalRelation = CachedLocalRelation
 
+class CachedRemoteRelation(google.protobuf.message.Message):
+    """Represents a remote relation that has been cached on server."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    RELATION_ID_FIELD_NUMBER: builtins.int
+    relation_id: builtins.str
+    """(Required) ID of the remote related (assigned by the service)."""
+    def __init__(
+        self,
+        *,
+        relation_id: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["relation_id", b"relation_id"]
+    ) -> None: ...
+
+global___CachedRemoteRelation = CachedRemoteRelation
+
 class Sample(google.protobuf.message.Message):
     """Relation of type [[Sample]] that samples a fraction of the dataset."""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org