You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/03/06 05:13:56 UTC
[spark] branch master updated: [SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 60534836b76 [SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect
60534836b76 is described below
commit 60534836b76f4af6983e65744850dfccb1d5dc9e
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Mon Mar 6 13:13:22 2023 +0800
[SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect
### What changes were proposed in this pull request?
Support SameSemantics in Spark Connect.
### Why are the changes needed?
API coverage
### Does this PR introduce _any_ user-facing change?
SameSemantics API calls from users returns result now than throwing an exception.
### How was this patch tested?
UT
Closes #40228 from amaliujia/sameSemantics.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 17 +-
.../scala/org/apache/spark/sql/SparkSession.scala | 4 +
.../sql/connect/client/SparkConnectClient.scala | 14 ++
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 6 +
.../src/main/protobuf/spark/connect/base.proto | 16 ++
.../service/SparkConnectAnalyzeHandler.scala | 12 ++
python/pyspark/sql/connect/client.py | 17 ++
python/pyspark/sql/connect/dataframe.py | 11 +-
python/pyspark/sql/connect/proto/base_pb2.py | 214 ++++++++++++---------
python/pyspark/sql/connect/proto/base_pb2.pyi | 64 ++++++
.../sql/tests/connect/test_connect_basic.py | 6 +-
.../sql/tests/connect/test_parity_dataframe.py | 3 +-
12 files changed, 284 insertions(+), 100 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 588e62768ac..74dd58f4903 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2741,8 +2741,23 @@ class Dataset[T] private[sql] (
throw new UnsupportedOperationException("localCheckpoint is not implemented.")
}
+ /**
+ * Returns `true` when the logical query plans inside both [[Dataset]]s are equal and therefore
+ * return same results.
+ *
+ * @note
+ * The equality comparison here is simplified by tolerating the cosmetic differences such as
+ * attribute names.
+ * @note
+ * This API can compare both [[Dataset]]s but can still return `false` on the [[Dataset]] that
+ * return the same results, for instance, from different plans. Such false negative semantic
+ * can be useful when caching as an example. This comparison may not be fast because it will
+ * execute a RPC call.
+ * @since 3.4.0
+ */
+ @DeveloperApi
def sameSemantics(other: Dataset[T]): Boolean = {
- throw new UnsupportedOperationException("sameSemantics is not implemented.")
+ sparkSession.sameSemantics(this.plan, other.plan)
}
def semanticHash(): Int = {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 31a63720c5c..85f576ec515 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -400,6 +400,10 @@ class SparkSession private[sql] (
client.analyze(method, Some(plan), explainMode)
}
+ private[sql] def sameSemantics(plan: proto.Plan, otherPlan: proto.Plan): Boolean = {
+ client.sameSemantics(plan, otherPlan).getSameSemantics.getResult
+ }
+
private[sql] def execute[T](plan: proto.Plan, encoder: AgnosticEncoder[T]): SparkResult[T] = {
val value = client.execute(plan)
val result = new SparkResult(value, allocator, encoder)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 8828a4a87e6..05aa191a4dd 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -141,6 +141,20 @@ private[sql] class SparkConnectClient(
builder.setSparkVersion(proto.AnalyzePlanRequest.SparkVersion.newBuilder().build())
case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
}
+ analyze(builder)
+ }
+
+ def sameSemantics(plan: proto.Plan, otherPlan: proto.Plan): proto.AnalyzePlanResponse = {
+ val builder = proto.AnalyzePlanRequest.newBuilder()
+ builder.setSameSemantics(
+ proto.AnalyzePlanRequest.SameSemantics
+ .newBuilder()
+ .setTargetPlan(plan)
+ .setOtherPlan(otherPlan))
+ analyze(builder)
+ }
+
+ private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = {
val request = builder
.setUserContext(userContext)
.setClientId(sessionId)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 089645a2d8d..11e28f538e8 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -628,6 +628,12 @@ class ClientE2ETestSuite extends RemoteSparkSession {
val result = spark.createDataFrame(data.asJava, schema).collect()
assert(result === data)
}
+
+ test("SameSemantics") {
+ val plan = spark.sql("select 1")
+ val otherPlan = spark.sql("select 1")
+ assert(plan.sameSemantics(otherPlan))
+ }
}
private[sql] case class MyType(id: Long, a: Double, b: Double)
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 09407a99119..2252d91c9ff 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -79,6 +79,7 @@ message AnalyzePlanRequest {
InputFiles input_files = 9;
SparkVersion spark_version = 10;
DDLParse ddl_parse = 11;
+ SameSemantics same_semantics = 12;
}
message Schema {
@@ -145,6 +146,16 @@ message AnalyzePlanRequest {
// (Required) The DDL formatted string to be parsed.
string ddl_string = 1;
}
+
+
+ // Returns `true` when the logical query plans are equal and therefore return same results.
+ message SameSemantics {
+ // (Required) The plan to be compared.
+ Plan target_plan = 1;
+
+ // (Required) The other plan to be compared.
+ Plan other_plan = 2;
+ }
}
// Response to performing analysis of the query. Contains relevant metadata to be able to
@@ -161,6 +172,7 @@ message AnalyzePlanResponse {
InputFiles input_files = 7;
SparkVersion spark_version = 8;
DDLParse ddl_parse = 9;
+ SameSemantics same_semantics = 10;
}
message Schema {
@@ -195,6 +207,10 @@ message AnalyzePlanResponse {
message DDLParse {
DataType parsed = 1;
}
+
+ message SameSemantics {
+ bool result = 1;
+ }
}
// A request to be executed by the service.
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index f6adcd852e8..e3d4da66a08 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -140,6 +140,18 @@ private[connect] class SparkConnectAnalyzeHandler(
.setParsed(DataTypeProtoConverter.toConnectProtoType(schema))
.build())
+ case proto.AnalyzePlanRequest.AnalyzeCase.SAME_SEMANTICS =>
+ val target = Dataset.ofRows(
+ session,
+ planner.transformRelation(request.getSameSemantics.getTargetPlan.getRoot))
+ val other = Dataset.ofRows(
+ session,
+ planner.transformRelation(request.getSameSemantics.getOtherPlan.getRoot))
+ builder.setSameSemantics(
+ proto.AnalyzePlanResponse.SameSemantics
+ .newBuilder()
+ .setResult(target.sameSemantics(other)))
+
case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
}
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 8be0c997538..2594640aa3e 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -401,6 +401,7 @@ class AnalyzeResult:
input_files: Optional[List[str]],
spark_version: Optional[str],
parsed: Optional[pb2.DataType],
+ is_same_semantics: Optional[bool],
):
self.schema = schema
self.explain_string = explain_string
@@ -410,6 +411,7 @@ class AnalyzeResult:
self.input_files = input_files
self.spark_version = spark_version
self.parsed = parsed
+ self.is_same_semantics = is_same_semantics
@classmethod
def fromProto(cls, pb: Any) -> "AnalyzeResult":
@@ -421,6 +423,7 @@ class AnalyzeResult:
input_files: Optional[List[str]] = None
spark_version: Optional[str] = None
parsed: Optional[pb2.DataType] = None
+ is_same_semantics: Optional[bool] = None
if pb.HasField("schema"):
schema = pb.schema.schema
@@ -438,6 +441,8 @@ class AnalyzeResult:
spark_version = pb.spark_version.version
elif pb.HasField("ddl_parse"):
parsed = pb.ddl_parse.parsed
+ elif pb.HasField("same_semantics"):
+ is_same_semantics = pb.same_semantics.result
else:
raise SparkConnectException("No analyze result found!")
@@ -450,6 +455,7 @@ class AnalyzeResult:
input_files,
spark_version,
parsed,
+ is_same_semantics,
)
@@ -690,6 +696,14 @@ class SparkConnectClient(object):
else:
return (None, properties)
+ def same_semantics(self, plan: pb2.Plan, other: pb2.Plan) -> bool:
+ """
+ return if two plans have the same semantics.
+ """
+ result = self._analyze(method="same_semantics", plan=plan, other=other).is_same_semantics
+ assert result is not None
+ return result
+
def close(self) -> None:
"""
Close the channel.
@@ -765,6 +779,9 @@ class SparkConnectClient(object):
req.spark_version.SetInParent()
elif method == "ddl_parse":
req.ddl_parse.ddl_string = cast(str, kwargs.get("ddl_string"))
+ elif method == "same_semantics":
+ req.same_semantics.target_plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+ req.same_semantics.other_plan.CopyFrom(cast(pb2.Plan, kwargs.get("other")))
else:
raise ValueError(f"Unknown Analyze method: {method}")
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index fdc71d46632..38e245f0335 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1610,8 +1610,15 @@ class DataFrame:
def semanticHash(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("semanticHash() is not implemented.")
- def sameSemantics(self, *args: Any, **kwargs: Any) -> None:
- raise NotImplementedError("sameSemantics() is not implemented.")
+ def sameSemantics(self, other: "DataFrame") -> bool:
+ assert self._plan is not None
+ assert other._plan is not None
+ return self._session.client.same_semantics(
+ plan=self._plan.to_proto(self._session.client),
+ other=other._plan.to_proto(other._session.client),
+ )
+
+ sameSemantics.__doc__ = PySparkDataFrame.sameSemantics.__doc__
def writeTo(self, table: str) -> "DataFrameWriterV2":
assert self._plan is not None
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 9ece82ed535..6d41ce28c7c 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
- b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
+ b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
)
@@ -52,6 +52,7 @@ _ANALYZEPLANREQUEST_ISSTREAMING = _ANALYZEPLANREQUEST.nested_types_by_name["IsSt
_ANALYZEPLANREQUEST_INPUTFILES = _ANALYZEPLANREQUEST.nested_types_by_name["InputFiles"]
_ANALYZEPLANREQUEST_SPARKVERSION = _ANALYZEPLANREQUEST.nested_types_by_name["SparkVersion"]
_ANALYZEPLANREQUEST_DDLPARSE = _ANALYZEPLANREQUEST.nested_types_by_name["DDLParse"]
+_ANALYZEPLANREQUEST_SAMESEMANTICS = _ANALYZEPLANREQUEST.nested_types_by_name["SameSemantics"]
_ANALYZEPLANRESPONSE = DESCRIPTOR.message_types_by_name["AnalyzePlanResponse"]
_ANALYZEPLANRESPONSE_SCHEMA = _ANALYZEPLANRESPONSE.nested_types_by_name["Schema"]
_ANALYZEPLANRESPONSE_EXPLAIN = _ANALYZEPLANRESPONSE.nested_types_by_name["Explain"]
@@ -61,6 +62,7 @@ _ANALYZEPLANRESPONSE_ISSTREAMING = _ANALYZEPLANRESPONSE.nested_types_by_name["Is
_ANALYZEPLANRESPONSE_INPUTFILES = _ANALYZEPLANRESPONSE.nested_types_by_name["InputFiles"]
_ANALYZEPLANRESPONSE_SPARKVERSION = _ANALYZEPLANRESPONSE.nested_types_by_name["SparkVersion"]
_ANALYZEPLANRESPONSE_DDLPARSE = _ANALYZEPLANRESPONSE.nested_types_by_name["DDLParse"]
+_ANALYZEPLANRESPONSE_SAMESEMANTICS = _ANALYZEPLANRESPONSE.nested_types_by_name["SameSemantics"]
_EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
_EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
_EXECUTEPLANRESPONSE_SQLCOMMANDRESULT = _EXECUTEPLANRESPONSE.nested_types_by_name[
@@ -203,6 +205,15 @@ AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.DDLParse)
},
),
+ "SameSemantics": _reflection.GeneratedProtocolMessageType(
+ "SameSemantics",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANREQUEST_SAMESEMANTICS,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SameSemantics)
+ },
+ ),
"DESCRIPTOR": _ANALYZEPLANREQUEST,
"__module__": "spark.connect.base_pb2"
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest)
@@ -217,6 +228,7 @@ _sym_db.RegisterMessage(AnalyzePlanRequest.IsStreaming)
_sym_db.RegisterMessage(AnalyzePlanRequest.InputFiles)
_sym_db.RegisterMessage(AnalyzePlanRequest.SparkVersion)
_sym_db.RegisterMessage(AnalyzePlanRequest.DDLParse)
+_sym_db.RegisterMessage(AnalyzePlanRequest.SameSemantics)
AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
"AnalyzePlanResponse",
@@ -294,6 +306,15 @@ AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.DDLParse)
},
),
+ "SameSemantics": _reflection.GeneratedProtocolMessageType(
+ "SameSemantics",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANRESPONSE_SAMESEMANTICS,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SameSemantics)
+ },
+ ),
"DESCRIPTOR": _ANALYZEPLANRESPONSE,
"__module__": "spark.connect.base_pb2"
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse)
@@ -308,6 +329,7 @@ _sym_db.RegisterMessage(AnalyzePlanResponse.IsStreaming)
_sym_db.RegisterMessage(AnalyzePlanResponse.InputFiles)
_sym_db.RegisterMessage(AnalyzePlanResponse.SparkVersion)
_sym_db.RegisterMessage(AnalyzePlanResponse.DDLParse)
+_sym_db.RegisterMessage(AnalyzePlanResponse.SameSemantics)
ExecutePlanRequest = _reflection.GeneratedProtocolMessageType(
"ExecutePlanRequest",
@@ -598,97 +620,101 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_USERCONTEXT._serialized_start = 309
_USERCONTEXT._serialized_end = 431
_ANALYZEPLANREQUEST._serialized_start = 434
- _ANALYZEPLANREQUEST._serialized_end = 1876
- _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1205
- _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1254
- _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1257
- _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1572
- _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1400
- _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1572
- _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1574
- _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1627
- _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1629
- _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1679
- _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1681
- _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1735
- _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1737
- _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1790
- _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1792
- _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1806
- _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1808
- _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1849
- _ANALYZEPLANRESPONSE._serialized_start = 1879
- _ANALYZEPLANRESPONSE._serialized_end = 2949
- _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2558
- _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2615
- _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2617
- _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2665
- _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2667
- _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 2712
- _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 2714
- _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 2750
- _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 2752
- _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 2800
- _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 2802
- _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 2836
- _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 2838
- _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 2878
- _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 2880
- _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 2939
- _EXECUTEPLANREQUEST._serialized_start = 2952
- _EXECUTEPLANREQUEST._serialized_end = 3159
- _EXECUTEPLANRESPONSE._serialized_start = 3162
- _EXECUTEPLANRESPONSE._serialized_end = 4386
- _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3617
- _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 3688
- _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 3690
- _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 3751
- _EXECUTEPLANRESPONSE_METRICS._serialized_start = 3754
- _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4271
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 3849
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4181
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4058
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4181
- _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4183
- _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4271
- _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4273
- _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4369
- _KEYVALUE._serialized_start = 4388
- _KEYVALUE._serialized_end = 4453
- _CONFIGREQUEST._serialized_start = 4456
- _CONFIGREQUEST._serialized_end = 5482
- _CONFIGREQUEST_OPERATION._serialized_start = 4674
- _CONFIGREQUEST_OPERATION._serialized_end = 5172
- _CONFIGREQUEST_SET._serialized_start = 5174
- _CONFIGREQUEST_SET._serialized_end = 5226
- _CONFIGREQUEST_GET._serialized_start = 5228
- _CONFIGREQUEST_GET._serialized_end = 5253
- _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5255
- _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5318
- _CONFIGREQUEST_GETOPTION._serialized_start = 5320
- _CONFIGREQUEST_GETOPTION._serialized_end = 5351
- _CONFIGREQUEST_GETALL._serialized_start = 5353
- _CONFIGREQUEST_GETALL._serialized_end = 5401
- _CONFIGREQUEST_UNSET._serialized_start = 5403
- _CONFIGREQUEST_UNSET._serialized_end = 5430
- _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5432
- _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5466
- _CONFIGRESPONSE._serialized_start = 5484
- _CONFIGRESPONSE._serialized_end = 5604
- _ADDARTIFACTSREQUEST._serialized_start = 5607
- _ADDARTIFACTSREQUEST._serialized_end = 6422
- _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 5954
- _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6007
- _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6009
- _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6120
- _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6122
- _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6215
- _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6218
- _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6411
- _ADDARTIFACTSRESPONSE._serialized_start = 6425
- _ADDARTIFACTSRESPONSE._serialized_end = 6613
- _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6532
- _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6613
- _SPARKCONNECTSERVICE._serialized_start = 6616
- _SPARKCONNECTSERVICE._serialized_end = 6981
+ _ANALYZEPLANREQUEST._serialized_end = 2089
+ _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1295
+ _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1344
+ _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1347
+ _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1662
+ _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1490
+ _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1662
+ _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1664
+ _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1717
+ _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1719
+ _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1769
+ _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1771
+ _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1825
+ _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1827
+ _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1880
+ _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1882
+ _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1896
+ _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1898
+ _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 1939
+ _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 1941
+ _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2062
+ _ANALYZEPLANRESPONSE._serialized_start = 2092
+ _ANALYZEPLANRESPONSE._serialized_end = 3294
+ _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 2862
+ _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 2919
+ _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 2921
+ _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 2969
+ _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 2971
+ _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3016
+ _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3018
+ _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3054
+ _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3056
+ _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3104
+ _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3106
+ _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3140
+ _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3142
+ _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3182
+ _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3184
+ _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3243
+ _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3245
+ _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3284
+ _EXECUTEPLANREQUEST._serialized_start = 3297
+ _EXECUTEPLANREQUEST._serialized_end = 3504
+ _EXECUTEPLANRESPONSE._serialized_start = 3507
+ _EXECUTEPLANRESPONSE._serialized_end = 4731
+ _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 3962
+ _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4033
+ _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4035
+ _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4096
+ _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4099
+ _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4616
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4194
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4526
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4403
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4526
+ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4528
+ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4616
+ _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4618
+ _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 4714
+ _KEYVALUE._serialized_start = 4733
+ _KEYVALUE._serialized_end = 4798
+ _CONFIGREQUEST._serialized_start = 4801
+ _CONFIGREQUEST._serialized_end = 5827
+ _CONFIGREQUEST_OPERATION._serialized_start = 5019
+ _CONFIGREQUEST_OPERATION._serialized_end = 5517
+ _CONFIGREQUEST_SET._serialized_start = 5519
+ _CONFIGREQUEST_SET._serialized_end = 5571
+ _CONFIGREQUEST_GET._serialized_start = 5573
+ _CONFIGREQUEST_GET._serialized_end = 5598
+ _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5600
+ _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5663
+ _CONFIGREQUEST_GETOPTION._serialized_start = 5665
+ _CONFIGREQUEST_GETOPTION._serialized_end = 5696
+ _CONFIGREQUEST_GETALL._serialized_start = 5698
+ _CONFIGREQUEST_GETALL._serialized_end = 5746
+ _CONFIGREQUEST_UNSET._serialized_start = 5748
+ _CONFIGREQUEST_UNSET._serialized_end = 5775
+ _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 5777
+ _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 5811
+ _CONFIGRESPONSE._serialized_start = 5829
+ _CONFIGRESPONSE._serialized_end = 5949
+ _ADDARTIFACTSREQUEST._serialized_start = 5952
+ _ADDARTIFACTSREQUEST._serialized_end = 6767
+ _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6299
+ _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6352
+ _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6354
+ _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6465
+ _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6467
+ _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6560
+ _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6563
+ _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 6756
+ _ADDARTIFACTSRESPONSE._serialized_start = 6770
+ _ADDARTIFACTSRESPONSE._serialized_end = 6958
+ _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 6877
+ _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 6958
+ _SPARKCONNECTSERVICE._serialized_start = 6961
+ _SPARKCONNECTSERVICE._serialized_end = 7326
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 56e1d2e416f..2e9a877b658 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -318,6 +318,38 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["ddl_string", b"ddl_string"]
) -> None: ...
+ class SameSemantics(google.protobuf.message.Message):
+ """Returns `true` when the logical query plans are equal and therefore return same results."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TARGET_PLAN_FIELD_NUMBER: builtins.int
+ OTHER_PLAN_FIELD_NUMBER: builtins.int
+ @property
+ def target_plan(self) -> global___Plan:
+ """(Required) The plan to be compared."""
+ @property
+ def other_plan(self) -> global___Plan:
+ """(Required) The other plan to be compared."""
+ def __init__(
+ self,
+ *,
+ target_plan: global___Plan | None = ...,
+ other_plan: global___Plan | None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "other_plan", b"other_plan", "target_plan", b"target_plan"
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "other_plan", b"other_plan", "target_plan", b"target_plan"
+ ],
+ ) -> None: ...
+
CLIENT_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -329,6 +361,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
INPUT_FILES_FIELD_NUMBER: builtins.int
SPARK_VERSION_FIELD_NUMBER: builtins.int
DDL_PARSE_FIELD_NUMBER: builtins.int
+ SAME_SEMANTICS_FIELD_NUMBER: builtins.int
client_id: builtins.str
"""(Required)
@@ -359,6 +392,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
def spark_version(self) -> global___AnalyzePlanRequest.SparkVersion: ...
@property
def ddl_parse(self) -> global___AnalyzePlanRequest.DDLParse: ...
+ @property
+ def same_semantics(self) -> global___AnalyzePlanRequest.SameSemantics: ...
def __init__(
self,
*,
@@ -373,6 +408,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
input_files: global___AnalyzePlanRequest.InputFiles | None = ...,
spark_version: global___AnalyzePlanRequest.SparkVersion | None = ...,
ddl_parse: global___AnalyzePlanRequest.DDLParse | None = ...,
+ same_semantics: global___AnalyzePlanRequest.SameSemantics | None = ...,
) -> None: ...
def HasField(
self,
@@ -393,6 +429,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"is_local",
"is_streaming",
b"is_streaming",
+ "same_semantics",
+ b"same_semantics",
"schema",
b"schema",
"spark_version",
@@ -424,6 +462,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"is_local",
"is_streaming",
b"is_streaming",
+ "same_semantics",
+ b"same_semantics",
"schema",
b"schema",
"spark_version",
@@ -450,6 +490,7 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
"input_files",
"spark_version",
"ddl_parse",
+ "same_semantics",
] | None: ...
global___AnalyzePlanRequest = AnalyzePlanRequest
@@ -583,6 +624,20 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["parsed", b"parsed"]
) -> None: ...
+ class SameSemantics(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ RESULT_FIELD_NUMBER: builtins.int
+ result: builtins.bool
+ def __init__(
+ self,
+ *,
+ result: builtins.bool = ...,
+ ) -> None: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["result", b"result"]
+ ) -> None: ...
+
CLIENT_ID_FIELD_NUMBER: builtins.int
SCHEMA_FIELD_NUMBER: builtins.int
EXPLAIN_FIELD_NUMBER: builtins.int
@@ -592,6 +647,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
INPUT_FILES_FIELD_NUMBER: builtins.int
SPARK_VERSION_FIELD_NUMBER: builtins.int
DDL_PARSE_FIELD_NUMBER: builtins.int
+ SAME_SEMANTICS_FIELD_NUMBER: builtins.int
client_id: builtins.str
@property
def schema(self) -> global___AnalyzePlanResponse.Schema: ...
@@ -609,6 +665,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
def spark_version(self) -> global___AnalyzePlanResponse.SparkVersion: ...
@property
def ddl_parse(self) -> global___AnalyzePlanResponse.DDLParse: ...
+ @property
+ def same_semantics(self) -> global___AnalyzePlanResponse.SameSemantics: ...
def __init__(
self,
*,
@@ -621,6 +679,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
input_files: global___AnalyzePlanResponse.InputFiles | None = ...,
spark_version: global___AnalyzePlanResponse.SparkVersion | None = ...,
ddl_parse: global___AnalyzePlanResponse.DDLParse | None = ...,
+ same_semantics: global___AnalyzePlanResponse.SameSemantics | None = ...,
) -> None: ...
def HasField(
self,
@@ -637,6 +696,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"is_streaming",
"result",
b"result",
+ "same_semantics",
+ b"same_semantics",
"schema",
b"schema",
"spark_version",
@@ -662,6 +723,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"is_streaming",
"result",
b"result",
+ "same_semantics",
+ b"same_semantics",
"schema",
b"schema",
"spark_version",
@@ -681,6 +744,7 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
"input_files",
"spark_version",
"ddl_parse",
+ "same_semantics",
] | None: ...
global___AnalyzePlanResponse = AnalyzePlanResponse
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 68ac8b1dd7c..806fe6e2329 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2809,6 +2809,11 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
self.spark.version,
)
+ def test_same_semantics(self):
+ plan = self.connect.sql("SELECT 1")
+ other = self.connect.sql("SELECT 1")
+ self.assertTrue(plan.sameSemantics(other))
+
def test_unsupported_functions(self):
# SPARK-41225: Disable unsupported functions.
df = self.connect.read.table(self.tbl_name)
@@ -2825,7 +2830,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
"localCheckpoint",
"_repr_html_",
"semanticHash",
- "sameSemantics",
):
with self.assertRaises(NotImplementedError):
getattr(df, f)()
diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index 79626586f73..31dee6a19d2 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -60,8 +60,7 @@ class DataFrameParityTests(DataFrameTestsMixin, ReusedConnectTestCase):
def test_repr_behaviors(self):
super().test_repr_behaviors()
- # TODO(SPARK-41874): Implement DataFrame `sameSemantics`
- @unittest.skip("Fails in Spark Connect, should enable.")
+ @unittest.skip("Spark Connect does not SparkContext but the tests depend on them.")
def test_same_semantics_error(self):
super().test_same_semantics_error()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org