You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/03/06 05:14:20 UTC

[spark] branch branch-3.4 updated: [SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0d9fb849ffd [SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect
0d9fb849ffd is described below

commit 0d9fb849ffdd9dc4beafbf444c575581538b5a34
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Mon Mar 6 13:13:22 2023 +0800

    [SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect
    
    ### What changes were proposed in this pull request?
    
    Support SameSemantics in Spark Connect.
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    SameSemantics API calls from users returns result now than throwing an exception.
    
    ### How was this patch tested?
    
    UT
    
    Closes #40228 from amaliujia/sameSemantics.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
    (cherry picked from commit 60534836b76f4af6983e65744850dfccb1d5dc9e)
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  17 +-
 .../scala/org/apache/spark/sql/SparkSession.scala  |   4 +
 .../sql/connect/client/SparkConnectClient.scala    |  14 ++
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |   6 +
 .../src/main/protobuf/spark/connect/base.proto     |  16 ++
 .../service/SparkConnectAnalyzeHandler.scala       |  12 ++
 python/pyspark/sql/connect/client.py               |  17 ++
 python/pyspark/sql/connect/dataframe.py            |  11 +-
 python/pyspark/sql/connect/proto/base_pb2.py       | 214 ++++++++++++---------
 python/pyspark/sql/connect/proto/base_pb2.pyi      |  64 ++++++
 .../sql/tests/connect/test_connect_basic.py        |   6 +-
 .../sql/tests/connect/test_parity_dataframe.py     |   3 +-
 12 files changed, 284 insertions(+), 100 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 588e62768ac..74dd58f4903 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2741,8 +2741,23 @@ class Dataset[T] private[sql] (
     throw new UnsupportedOperationException("localCheckpoint is not implemented.")
   }
 
+  /**
+   * Returns `true` when the logical query plans inside both [[Dataset]]s are equal and therefore
+   * return same results.
+   *
+   * @note
+   *   The equality comparison here is simplified by tolerating the cosmetic differences such as
+   *   attribute names.
+   * @note
+   *   This API can compare both [[Dataset]]s but can still return `false` on the [[Dataset]] that
+   *   return the same results, for instance, from different plans. Such false negative semantic
+   *   can be useful when caching as an example. This comparison may not be fast because it will
+   *   execute a RPC call.
+   * @since 3.4.0
+   */
+  @DeveloperApi
   def sameSemantics(other: Dataset[T]): Boolean = {
-    throw new UnsupportedOperationException("sameSemantics is not implemented.")
+    sparkSession.sameSemantics(this.plan, other.plan)
   }
 
   def semanticHash(): Int = {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 31a63720c5c..85f576ec515 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -400,6 +400,10 @@ class SparkSession private[sql] (
     client.analyze(method, Some(plan), explainMode)
   }
 
+  private[sql] def sameSemantics(plan: proto.Plan, otherPlan: proto.Plan): Boolean = {
+    client.sameSemantics(plan, otherPlan).getSameSemantics.getResult
+  }
+
   private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = {
     val value = client.execute(plan)
     val result = new SparkResult(value, allocator, encoder)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 8828a4a87e6..05aa191a4dd 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -141,6 +141,20 @@ private[sql] class SparkConnectClient(
         builder.setSparkVersion(proto.AnalyzePlanRequest.SparkVersion.newBuilder().build())
       case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
     }
+    analyze(builder)
+  }
+
+  def sameSemantics(plan: proto.Plan, otherPlan: proto.Plan): proto.AnalyzePlanResponse = {
+    val builder = proto.AnalyzePlanRequest.newBuilder()
+    builder.setSameSemantics(
+      proto.AnalyzePlanRequest.SameSemantics
+        .newBuilder()
+        .setTargetPlan(plan)
+        .setOtherPlan(otherPlan))
+    analyze(builder)
+  }
+
+  private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = {
     val request = builder
       .setUserContext(userContext)
       .setClientId(sessionId)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 089645a2d8d..11e28f538e8 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -628,6 +628,12 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     val result = spark.createDataFrame(data.asJava, schema).collect()
     assert(result === data)
   }
+
+  test("SameSemantics") {
+    val plan = spark.sql("select 1")
+    val otherPlan = spark.sql("select 1")
+    assert(plan.sameSemantics(otherPlan))
+  }
 }
 
 private[sql] case class MyType(id: Long, a: Double, b: Double)
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 09407a99119..2252d91c9ff 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -79,6 +79,7 @@ message AnalyzePlanRequest {
     InputFiles input_files = 9;
     SparkVersion spark_version = 10;
     DDLParse ddl_parse = 11;
+    SameSemantics same_semantics = 12;
   }
 
   message Schema {
@@ -145,6 +146,16 @@ message AnalyzePlanRequest {
     // (Required) The DDL formatted string to be parsed.
     string ddl_string = 1;
   }
+
+
+  // Returns `true` when the logical query plans  are equal and therefore return same results.
+  message SameSemantics {
+    // (Required) The plan to be compared.
+    Plan target_plan = 1;
+
+    // (Required) The other plan to be compared.
+    Plan other_plan = 2;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
@@ -161,6 +172,7 @@ message AnalyzePlanResponse {
     InputFiles input_files = 7;
     SparkVersion spark_version = 8;
     DDLParse ddl_parse = 9;
+    SameSemantics same_semantics = 10;
   }
 
   message Schema {
@@ -195,6 +207,10 @@ message AnalyzePlanResponse {
   message DDLParse {
     DataType parsed = 1;
   }
+
+  message SameSemantics {
+    bool result = 1;
+  }
 }
 
 // A request to be executed by the service.
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index f6adcd852e8..e3d4da66a08 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -140,6 +140,18 @@ private[connect] class SparkConnectAnalyzeHandler(
             .setParsed(DataTypeProtoConverter.toConnectProtoType(schema))
             .build())
 
+      case proto.AnalyzePlanRequest.AnalyzeCase.SAME_SEMANTICS =>
+        val target = Dataset.ofRows(
+          session,
+          planner.transformRelation(request.getSameSemantics.getTargetPlan.getRoot))
+        val other = Dataset.ofRows(
+          session,
+          planner.transformRelation(request.getSameSemantics.getOtherPlan.getRoot))
+        builder.setSameSemantics(
+          proto.AnalyzePlanResponse.SameSemantics
+            .newBuilder()
+            .setResult(target.sameSemantics(other)))
+
       case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
     }
 
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 8be0c997538..2594640aa3e 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -401,6 +401,7 @@ class AnalyzeResult:
         input_files: Optional[List[str]],
         spark_version: Optional[str],
         parsed: Optional[pb2.DataType],
+        is_same_semantics: Optional[bool],
     ):
         self.schema = schema
         self.explain_string = explain_string
@@ -410,6 +411,7 @@ class AnalyzeResult:
         self.input_files = input_files
         self.spark_version = spark_version
         self.parsed = parsed
+        self.is_same_semantics = is_same_semantics
 
     @classmethod
     def fromProto(cls, pb: Any) -> "AnalyzeResult":
@@ -421,6 +423,7 @@ class AnalyzeResult:
         input_files: Optional[List[str]] = None
         spark_version: Optional[str] = None
         parsed: Optional[pb2.DataType] = None
+        is_same_semantics: Optional[bool] = None
 
         if pb.HasField("schema"):
             schema = pb.schema.schema
@@ -438,6 +441,8 @@ class AnalyzeResult:
             spark_version = pb.spark_version.version
         elif pb.HasField("ddl_parse"):
             parsed = pb.ddl_parse.parsed
+        elif pb.HasField("same_semantics"):
+            is_same_semantics = pb.same_semantics.result
         else:
             raise SparkConnectException("No analyze result found!")
 
@@ -450,6 +455,7 @@ class AnalyzeResult:
             input_files,
             spark_version,
             parsed,
+            is_same_semantics,
         )
 
 
@@ -690,6 +696,14 @@ class SparkConnectClient(object):
         else:
             return (None, properties)
 
+    def same_semantics(self, plan: pb2.Plan, other: pb2.Plan) -> bool:
+        """
+        return if two plans have the same semantics.
+        """
+        result = self._analyze(method="same_semantics", plan=plan, other=other).is_same_semantics
+        assert result is not None
+        return result
+
     def close(self) -> None:
         """
         Close the channel.
@@ -765,6 +779,9 @@ class SparkConnectClient(object):
             req.spark_version.SetInParent()
         elif method == "ddl_parse":
             req.ddl_parse.ddl_string = cast(str, kwargs.get("ddl_string"))
+        elif method == "same_semantics":
+            req.same_semantics.target_plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+            req.same_semantics.other_plan.CopyFrom(cast(pb2.Plan, kwargs.get("other")))
         else:
             raise ValueError(f"Unknown Analyze method: {method}")
 
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index fdc71d46632..38e245f0335 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1610,8 +1610,15 @@ class DataFrame:
     def semanticHash(self, *args: Any, **kwargs: Any) -> None:
         raise NotImplementedError("semanticHash() is not implemented.")
 
-    def sameSemantics(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("sameSemantics() is not implemented.")
+    def sameSemantics(self, other: "DataFrame") -> bool:
+        assert self._plan is not None
+        assert other._plan is not None
+        return self._session.client.same_semantics(
+            plan=self._plan.to_proto(self._session.client),
+            other=other._plan.to_proto(other._session.client),
+        )
+
+    sameSemantics.__doc__ = PySparkDataFrame.sameSemantics.__doc__
 
     def writeTo(self, table: str) -> "DataFrameWriterV2":
         assert self._plan is not None
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 9ece82ed535..6d41ce28c7c 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
+    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
 )
 
 
@@ -52,6 +52,7 @@ _ANALYZEPLANREQUEST_ISSTREAMING = _ANALYZEPLANREQUEST.nested_types_by_name["IsSt
 _ANALYZEPLANREQUEST_INPUTFILES = _ANALYZEPLANREQUEST.nested_types_by_name["InputFiles"]
 _ANALYZEPLANREQUEST_SPARKVERSION = _ANALYZEPLANREQUEST.nested_types_by_name["SparkVersion"]
 _ANALYZEPLANREQUEST_DDLPARSE = _ANALYZEPLANREQUEST.nested_types_by_name["DDLParse"]
+_ANALYZEPLANREQUEST_SAMESEMANTICS = _ANALYZEPLANREQUEST.nested_types_by_name["SameSemantics"]
 _ANALYZEPLANRESPONSE = DESCRIPTOR.message_types_by_name["AnalyzePlanResponse"]
 _ANALYZEPLANRESPONSE_SCHEMA = _ANALYZEPLANRESPONSE.nested_types_by_name["Schema"]
 _ANALYZEPLANRESPONSE_EXPLAIN = _ANALYZEPLANRESPONSE.nested_types_by_name["Explain"]
@@ -61,6 +62,7 @@ _ANALYZEPLANRESPONSE_ISSTREAMING = _ANALYZEPLANRESPONSE.nested_types_by_name["Is
 _ANALYZEPLANRESPONSE_INPUTFILES = _ANALYZEPLANRESPONSE.nested_types_by_name["InputFiles"]
 _ANALYZEPLANRESPONSE_SPARKVERSION = _ANALYZEPLANRESPONSE.nested_types_by_name["SparkVersion"]
 _ANALYZEPLANRESPONSE_DDLPARSE = _ANALYZEPLANRESPONSE.nested_types_by_name["DDLParse"]
+_ANALYZEPLANRESPONSE_SAMESEMANTICS = _ANALYZEPLANRESPONSE.nested_types_by_name["SameSemantics"]
 _EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
 _EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
 _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT = _EXECUTEPLANRESPONSE.nested_types_by_name[
@@ -203,6 +205,15 @@ AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
                 # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.DDLParse)
             },
         ),
+        "SameSemantics": _reflection.GeneratedProtocolMessageType(
+            "SameSemantics",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_SAMESEMANTICS,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SameSemantics)
+            },
+        ),
         "DESCRIPTOR": _ANALYZEPLANREQUEST,
         "__module__": "spark.connect.base_pb2"
         # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest)
@@ -217,6 +228,7 @@ _sym_db.RegisterMessage(AnalyzePlanRequest.IsStreaming)
 _sym_db.RegisterMessage(AnalyzePlanRequest.InputFiles)
 _sym_db.RegisterMessage(AnalyzePlanRequest.SparkVersion)
 _sym_db.RegisterMessage(AnalyzePlanRequest.DDLParse)
+_sym_db.RegisterMessage(AnalyzePlanRequest.SameSemantics)
 
 AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
     "AnalyzePlanResponse",
@@ -294,6 +306,15 @@ AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
                 # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.DDLParse)
             },
         ),
+        "SameSemantics": _reflection.GeneratedProtocolMessageType(
+            "SameSemantics",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_SAMESEMANTICS,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SameSemantics)
+            },
+        ),
         "DESCRIPTOR": _ANALYZEPLANRESPONSE,
         "__module__": "spark.connect.base_pb2"
         # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse)
@@ -308,6 +329,7 @@ _sym_db.RegisterMessage(AnalyzePlanResponse.IsStreaming)
 _sym_db.RegisterMessage(AnalyzePlanResponse.InputFiles)
 _sym_db.RegisterMessage(AnalyzePlanResponse.SparkVersion)
 _sym_db.RegisterMessage(AnalyzePlanResponse.DDLParse)
+_sym_db.RegisterMessage(AnalyzePlanResponse.SameSemantics)
 
 ExecutePlanRequest = _reflection.GeneratedProtocolMessageType(
     "ExecutePlanRequest",
@@ -598,97 +620,101 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _USERCONTEXT._serialized_start = 309
     _USERCONTEXT._serialized_end = 431
     _ANALYZEPLANREQUEST._serialized_start = 434
-    _ANALYZEPLANREQUEST._serialized_end = 1876
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1205
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1254
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1257
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1572
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1400
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1572
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1574
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1627
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1629
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1679
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1681
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1735
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1737
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1790
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1792
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1806
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1808
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1849
-    _ANALYZEPLANRESPONSE._serialized_start = 1879
-    _ANALYZEPLANRESPONSE._serialized_end = 2949
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2558
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2615
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2617
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2665
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2667
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 2712
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 2714
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 2750
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 2752
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 2800
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 2802
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 2836
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 2838
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 2878
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 2880
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 2939
-    _EXECUTEPLANREQUEST._serialized_start = 2952
-    _EXECUTEPLANREQUEST._serialized_end = 3159
-    _EXECUTEPLANRESPONSE._serialized_start = 3162
-    _EXECUTEPLANRESPONSE._serialized_end = 4386
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3617
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 3688
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 3690
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 3751
-    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 3754
-    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4271
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 3849
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4181
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4058
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4181
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4183
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4271
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4273
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4369
-    _KEYVALUE._serialized_start = 4388
-    _KEYVALUE._serialized_end = 4453
-    _CONFIGREQUEST._serialized_start = 4456
-    _CONFIGREQUEST._serialized_end = 5482
-    _CONFIGREQUEST_OPERATION._serialized_start = 4674
-    _CONFIGREQUEST_OPERATION._serialized_end = 5172
-    _CONFIGREQUEST_SET._serialized_start = 5174
-    _CONFIGREQUEST_SET._serialized_end = 5226
-    _CONFIGREQUEST_GET._serialized_start = 5228
-    _CONFIGREQUEST_GET._serialized_end = 5253
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5255
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5318
-    _CONFIGREQUEST_GETOPTION._serialized_start = 5320
-    _CONFIGREQUEST_GETOPTION._serialized_end = 5351
-    _CONFIGREQUEST_GETALL._serialized_start = 5353
-    _CONFIGREQUEST_GETALL._serialized_end = 5401
-    _CONFIGREQUEST_UNSET._serialized_start = 5403
-    _CONFIGREQUEST_UNSET._serialized_end = 5430
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5432
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5466
-    _CONFIGRESPONSE._serialized_start = 5484
-    _CONFIGRESPONSE._serialized_end = 5604
-    _ADDARTIFACTSREQUEST._serialized_start = 5607
-    _ADDARTIFACTSREQUEST._serialized_end = 6422
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 5954
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6007
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6009
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6120
-    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6122
-    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6215
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6218
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6411
-    _ADDARTIFACTSRESPONSE._serialized_start = 6425
-    _ADDARTIFACTSRESPONSE._serialized_end = 6613
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6532
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6613
-    _SPARKCONNECTSERVICE._serialized_start = 6616
-    _SPARKCONNECTSERVICE._serialized_end = 6981
+    _ANALYZEPLANREQUEST._serialized_end = 2089
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1295
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1344
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1347
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1662
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1490
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1662
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1664
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1717
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1719
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1769
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1771
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1825
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1827
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1880
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1882
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1896
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1898
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1939
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1941
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2062
+    _ANALYZEPLANRESPONSE._serialized_start = 2092
+    _ANALYZEPLANRESPONSE._serialized_end = 3294
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2862
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2919
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2921
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2969
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2971
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3016
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3018
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3054
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3056
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3104
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3106
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3140
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3142
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3182
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3184
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3243
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3245
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3284
+    _EXECUTEPLANREQUEST._serialized_start = 3297
+    _EXECUTEPLANREQUEST._serialized_end = 3504
+    _EXECUTEPLANRESPONSE._serialized_start = 3507
+    _EXECUTEPLANRESPONSE._serialized_end = 4731
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3962
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4033
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4035
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4096
+    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4099
+    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4616
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4194
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4526
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4403
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4526
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4528
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4616
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4618
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4714
+    _KEYVALUE._serialized_start = 4733
+    _KEYVALUE._serialized_end = 4798
+    _CONFIGREQUEST._serialized_start = 4801
+    _CONFIGREQUEST._serialized_end = 5827
+    _CONFIGREQUEST_OPERATION._serialized_start = 5019
+    _CONFIGREQUEST_OPERATION._serialized_end = 5517
+    _CONFIGREQUEST_SET._serialized_start = 5519
+    _CONFIGREQUEST_SET._serialized_end = 5571
+    _CONFIGREQUEST_GET._serialized_start = 5573
+    _CONFIGREQUEST_GET._serialized_end = 5598
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5600
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5663
+    _CONFIGREQUEST_GETOPTION._serialized_start = 5665
+    _CONFIGREQUEST_GETOPTION._serialized_end = 5696
+    _CONFIGREQUEST_GETALL._serialized_start = 5698
+    _CONFIGREQUEST_GETALL._serialized_end = 5746
+    _CONFIGREQUEST_UNSET._serialized_start = 5748
+    _CONFIGREQUEST_UNSET._serialized_end = 5775
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5777
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5811
+    _CONFIGRESPONSE._serialized_start = 5829
+    _CONFIGRESPONSE._serialized_end = 5949
+    _ADDARTIFACTSREQUEST._serialized_start = 5952
+    _ADDARTIFACTSREQUEST._serialized_end = 6767
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6299
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6352
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6354
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6465
+    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6467
+    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6560
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6563
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6756
+    _ADDARTIFACTSRESPONSE._serialized_start = 6770
+    _ADDARTIFACTSRESPONSE._serialized_end = 6958
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6877
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6958
+    _SPARKCONNECTSERVICE._serialized_start = 6961
+    _SPARKCONNECTSERVICE._serialized_end = 7326
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 56e1d2e416f..2e9a877b658 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -318,6 +318,38 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             self, field_name: typing_extensions.Literal["ddl_string", b"ddl_string"]
         ) -> None: ...
 
+    class SameSemantics(google.protobuf.message.Message):
+        """Returns `true` when the logical query plans  are equal and therefore return same results."""
+
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        TARGET_PLAN_FIELD_NUMBER: builtins.int
+        OTHER_PLAN_FIELD_NUMBER: builtins.int
+        @property
+        def target_plan(self) -> global___Plan:
+            """(Required) The plan to be compared."""
+        @property
+        def other_plan(self) -> global___Plan:
+            """(Required) The other plan to be compared."""
+        def __init__(
+            self,
+            *,
+            target_plan: global___Plan | None = ...,
+            other_plan: global___Plan | None = ...,
+        ) -> None: ...
+        def HasField(
+            self,
+            field_name: typing_extensions.Literal[
+                "other_plan", b"other_plan", "target_plan", b"target_plan"
+            ],
+        ) -> builtins.bool: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal[
+                "other_plan", b"other_plan", "target_plan", b"target_plan"
+            ],
+        ) -> None: ...
+
     CLIENT_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -329,6 +361,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     INPUT_FILES_FIELD_NUMBER: builtins.int
     SPARK_VERSION_FIELD_NUMBER: builtins.int
     DDL_PARSE_FIELD_NUMBER: builtins.int
+    SAME_SEMANTICS_FIELD_NUMBER: builtins.int
     client_id: builtins.str
     """(Required)
 
@@ -359,6 +392,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     def spark_version(self) -> global___AnalyzePlanRequest.SparkVersion: ...
     @property
     def ddl_parse(self) -> global___AnalyzePlanRequest.DDLParse: ...
+    @property
+    def same_semantics(self) -> global___AnalyzePlanRequest.SameSemantics: ...
     def __init__(
         self,
         *,
@@ -373,6 +408,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         input_files: global___AnalyzePlanRequest.InputFiles | None = ...,
         spark_version: global___AnalyzePlanRequest.SparkVersion | None = ...,
         ddl_parse: global___AnalyzePlanRequest.DDLParse | None = ...,
+        same_semantics: global___AnalyzePlanRequest.SameSemantics | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -393,6 +429,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "same_semantics",
+            b"same_semantics",
             "schema",
             b"schema",
             "spark_version",
@@ -424,6 +462,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "same_semantics",
+            b"same_semantics",
             "schema",
             b"schema",
             "spark_version",
@@ -450,6 +490,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         "input_files",
         "spark_version",
         "ddl_parse",
+        "same_semantics",
     ] | None: ...
 
 global___AnalyzePlanRequest = AnalyzePlanRequest
@@ -583,6 +624,20 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             self, field_name: typing_extensions.Literal["parsed", b"parsed"]
         ) -> None: ...
 
+    class SameSemantics(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        RESULT_FIELD_NUMBER: builtins.int
+        result: builtins.bool
+        def __init__(
+            self,
+            *,
+            result: builtins.bool = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["result", b"result"]
+        ) -> None: ...
+
     CLIENT_ID_FIELD_NUMBER: builtins.int
     SCHEMA_FIELD_NUMBER: builtins.int
     EXPLAIN_FIELD_NUMBER: builtins.int
@@ -592,6 +647,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     INPUT_FILES_FIELD_NUMBER: builtins.int
     SPARK_VERSION_FIELD_NUMBER: builtins.int
     DDL_PARSE_FIELD_NUMBER: builtins.int
+    SAME_SEMANTICS_FIELD_NUMBER: builtins.int
     client_id: builtins.str
     @property
     def schema(self) -> global___AnalyzePlanResponse.Schema: ...
@@ -609,6 +665,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     def spark_version(self) -> global___AnalyzePlanResponse.SparkVersion: ...
     @property
     def ddl_parse(self) -> global___AnalyzePlanResponse.DDLParse: ...
+    @property
+    def same_semantics(self) -> global___AnalyzePlanResponse.SameSemantics: ...
     def __init__(
         self,
         *,
@@ -621,6 +679,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
         input_files: global___AnalyzePlanResponse.InputFiles | None = ...,
         spark_version: global___AnalyzePlanResponse.SparkVersion | None = ...,
         ddl_parse: global___AnalyzePlanResponse.DDLParse | None = ...,
+        same_semantics: global___AnalyzePlanResponse.SameSemantics | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -637,6 +696,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"is_streaming",
             "result",
             b"result",
+            "same_semantics",
+            b"same_semantics",
             "schema",
             b"schema",
             "spark_version",
@@ -662,6 +723,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"is_streaming",
             "result",
             b"result",
+            "same_semantics",
+            b"same_semantics",
             "schema",
             b"schema",
             "spark_version",
@@ -681,6 +744,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
         "input_files",
         "spark_version",
         "ddl_parse",
+        "same_semantics",
     ] | None: ...
 
 global___AnalyzePlanResponse = AnalyzePlanResponse
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 68ac8b1dd7c..806fe6e2329 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2809,6 +2809,11 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
             self.spark.version,
         )
 
+    def test_same_semantics(self):
+        plan = self.connect.sql("SELECT 1")
+        other = self.connect.sql("SELECT 1")
+        self.assertTrue(plan.sameSemantics(other))
+
     def test_unsupported_functions(self):
         # SPARK-41225: Disable unsupported functions.
         df = self.connect.read.table(self.tbl_name)
@@ -2825,7 +2830,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
             "localCheckpoint",
             "_repr_html_",
             "semanticHash",
-            "sameSemantics",
         ):
             with self.assertRaises(NotImplementedError):
                 getattr(df, f)()
diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index 79626586f73..31dee6a19d2 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -60,8 +60,7 @@ class DataFrameParityTests(DataFrameTestsMixin, ReusedConnectTestCase):
     def test_repr_behaviors(self):
         super().test_repr_behaviors()
 
-    # TODO(SPARK-41874): Implement DataFrame `sameSemantics`
-    @unittest.skip("Fails in Spark Connect, should enable.")
+    @unittest.skip("Spark Connect does not SparkContext but the tests depend on them.")
     def test_same_semantics_error(self):
         super().test_same_semantics_error()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org