You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/03/22 01:18:34 UTC
[spark] branch branch-3.4 updated: [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5622981c885 [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel
5622981c885 is described below
commit 5622981c88542bd1508a09fd0b45bcfb0f1fc136
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Wed Mar 22 10:18:10 2023 +0900
[SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel
### What changes were proposed in this pull request?
Implements `DataFrame.cache`, `persist`, `unpersist`, and `storageLevel`.
### Why are the changes needed?
Missing APIs.
### Does this PR introduce _any_ user-facing change?
`DataFrame.cache`, `persist`, `unpersist`, and `storageLevel` will be available.
### How was this patch tested?
Added/enabled the related tests.
Closes #40510 from ueshin/issues/SPARK-42889/cache.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit b4f02248972c357cc2af6881b10565315ea15cb4)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../src/main/protobuf/spark/connect/base.proto | 50 ++++
.../common/StorageLevelProtoConverter.scala | 46 ++++
.../service/SparkConnectAnalyzeHandler.scala | 33 ++-
python/pyspark/sql/connect/client.py | 36 +++
python/pyspark/sql/connect/dataframe.py | 56 +++-
python/pyspark/sql/connect/proto/base_pb2.py | 298 ++++++++++++++-------
python/pyspark/sql/connect/proto/base_pb2.pyi | 228 ++++++++++++++++
python/pyspark/sql/dataframe.py | 20 ++
.../sql/tests/connect/test_connect_basic.py | 3 -
python/pyspark/sql/tests/test_dataframe.py | 26 +-
python/pyspark/storagelevel.py | 14 +-
11 files changed, 692 insertions(+), 118 deletions(-)
diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index da0f974a749..591f32cea1b 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -54,6 +54,20 @@ message UserContext {
repeated google.protobuf.Any extensions = 999;
}
+// StorageLevel for persisting Datasets/Tables.
+message StorageLevel {
+ // (Required) Whether the cache should use disk or not.
+ bool use_disk = 1;
+ // (Required) Whether the cache should use memory or not.
+ bool use_memory = 2;
+ // (Required) Whether the cache should use off-heap or not.
+ bool use_off_heap = 3;
+ // (Required) Whether the cached data is deserialized or not.
+ bool deserialized = 4;
+ // (Required) The number of replicas.
+ int32 replication = 5;
+}
+
// Request to perform plan analyze, optionally to explain the plan.
message AnalyzePlanRequest {
// (Required)
@@ -82,6 +96,9 @@ message AnalyzePlanRequest {
DDLParse ddl_parse = 11;
SameSemantics same_semantics = 12;
SemanticHash semantic_hash = 13;
+ Persist persist = 14;
+ Unpersist unpersist = 15;
+ GetStorageLevel get_storage_level = 16;
}
message Schema {
@@ -163,6 +180,27 @@ message AnalyzePlanRequest {
// (Required) The logical plan to get a hashCode.
Plan plan = 1;
}
+
+ message Persist {
+ // (Required) The logical plan to persist.
+ Relation relation = 1;
+
+ // (Optional) The storage level.
+ optional StorageLevel storage_level = 2;
+ }
+
+ message Unpersist {
+ // (Required) The logical plan to unpersist.
+ Relation relation = 1;
+
+ // (Optional) Whether to block until all blocks are deleted.
+ optional bool blocking = 2;
+ }
+
+ message GetStorageLevel {
+ // (Required) The logical plan to get the storage level.
+ Relation relation = 1;
+ }
}
// Response to performing analysis of the query. Contains relevant metadata to be able to
@@ -181,6 +219,9 @@ message AnalyzePlanResponse {
DDLParse ddl_parse = 9;
SameSemantics same_semantics = 10;
SemanticHash semantic_hash = 11;
+ Persist persist = 12;
+ Unpersist unpersist = 13;
+ GetStorageLevel get_storage_level = 14;
}
message Schema {
@@ -223,6 +264,15 @@ message AnalyzePlanResponse {
message SemanticHash {
int32 result = 1;
}
+
+ message Persist { }
+
+ message Unpersist { }
+
+ message GetStorageLevel {
+ // (Required) The StorageLevel as a result of get_storage_level request.
+ StorageLevel storage_level = 1;
+ }
}
// A request to be executed by the service.
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala
new file mode 100644
index 00000000000..7bf273843b5
--- /dev/null
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import org.apache.spark.connect.proto
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Helper class for conversions between [[StrageLevel]] and [[proto.StorageLevel]].
+ */
+object StorageLevelProtoConverter {
+ def toStorageLevel(sl: proto.StorageLevel): StorageLevel = {
+ StorageLevel(
+ useDisk = sl.getUseDisk,
+ useMemory = sl.getUseMemory,
+ useOffHeap = sl.getUseOffHeap,
+ deserialized = sl.getDeserialized,
+ replication = sl.getReplication)
+ }
+
+ def toConnectProtoType(sl: StorageLevel): proto.StorageLevel = {
+ proto.StorageLevel
+ .newBuilder()
+ .setUseDisk(sl.useDisk)
+ .setUseMemory(sl.useMemory)
+ .setUseOffHeap(sl.useOffHeap)
+ .setDeserialized(sl.deserialized)
+ .setReplication(sl.replication)
+ .build()
+ }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index 4697a1fd7d4..a03b827b60e 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, StorageLevelProtoConverter}
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode}
@@ -161,6 +161,37 @@ private[connect] class SparkConnectAnalyzeHandler(
.newBuilder()
.setResult(semanticHash))
+ case proto.AnalyzePlanRequest.AnalyzeCase.PERSIST =>
+ val target = Dataset
+ .ofRows(session, planner.transformRelation(request.getPersist.getRelation))
+ if (request.getPersist.hasStorageLevel) {
+ target.persist(
+ StorageLevelProtoConverter.toStorageLevel(request.getPersist.getStorageLevel))
+ } else {
+ target.persist()
+ }
+ builder.setPersist(proto.AnalyzePlanResponse.Persist.newBuilder().build())
+
+ case proto.AnalyzePlanRequest.AnalyzeCase.UNPERSIST =>
+ val target = Dataset
+ .ofRows(session, planner.transformRelation(request.getUnpersist.getRelation))
+ if (request.getUnpersist.hasBlocking) {
+ target.unpersist(request.getUnpersist.getBlocking)
+ } else {
+ target.unpersist()
+ }
+ builder.setUnpersist(proto.AnalyzePlanResponse.Unpersist.newBuilder().build())
+
+ case proto.AnalyzePlanRequest.AnalyzeCase.GET_STORAGE_LEVEL =>
+ val target = Dataset
+ .ofRows(session, planner.transformRelation(request.getGetStorageLevel.getRelation))
+ val storageLevel = target.storageLevel
+ builder.setGetStorageLevel(
+ proto.AnalyzePlanResponse.GetStorageLevel
+ .newBuilder()
+ .setStorageLevel(StorageLevelProtoConverter.toConnectProtoType(storageLevel))
+ .build())
+
case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
}
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 4db852c951e..6de78e72af8 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -74,6 +74,7 @@ from pyspark.sql.connect.expressions import (
from pyspark.sql.pandas.types import _check_series_localize_timestamps, _convert_map_items_to_dict
from pyspark.sql.types import DataType, MapType, StructType, TimestampType
from pyspark.rdd import PythonEvalType
+from pyspark.storagelevel import StorageLevel
if TYPE_CHECKING:
@@ -413,6 +414,7 @@ class AnalyzeResult:
parsed: Optional[DataType],
is_same_semantics: Optional[bool],
semantic_hash: Optional[int],
+ storage_level: Optional[StorageLevel],
):
self.schema = schema
self.explain_string = explain_string
@@ -424,6 +426,7 @@ class AnalyzeResult:
self.parsed = parsed
self.is_same_semantics = is_same_semantics
self.semantic_hash = semantic_hash
+ self.storage_level = storage_level
@classmethod
def fromProto(cls, pb: Any) -> "AnalyzeResult":
@@ -437,6 +440,7 @@ class AnalyzeResult:
parsed: Optional[DataType] = None
is_same_semantics: Optional[bool] = None
semantic_hash: Optional[int] = None
+ storage_level: Optional[StorageLevel] = None
if pb.HasField("schema"):
schema = types.proto_schema_to_pyspark_data_type(pb.schema.schema)
@@ -458,6 +462,18 @@ class AnalyzeResult:
is_same_semantics = pb.same_semantics.result
elif pb.HasField("semantic_hash"):
semantic_hash = pb.semantic_hash.result
+ elif pb.HasField("persist"):
+ pass
+ elif pb.HasField("unpersist"):
+ pass
+ elif pb.HasField("get_storage_level"):
+ storage_level = StorageLevel(
+ useDisk=pb.get_storage_level.storage_level.use_disk,
+ useMemory=pb.get_storage_level.storage_level.use_memory,
+ useOffHeap=pb.get_storage_level.storage_level.use_off_heap,
+ deserialized=pb.get_storage_level.storage_level.deserialized,
+ replication=pb.get_storage_level.storage_level.replication,
+ )
else:
raise SparkConnectException("No analyze result found!")
@@ -472,6 +488,7 @@ class AnalyzeResult:
parsed,
is_same_semantics,
semantic_hash,
+ storage_level,
)
@@ -820,6 +837,25 @@ class SparkConnectClient(object):
req.same_semantics.other_plan.CopyFrom(cast(pb2.Plan, kwargs.get("other")))
elif method == "semantic_hash":
req.semantic_hash.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+ elif method == "persist":
+ req.persist.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation")))
+ if kwargs.get("storage_level", None) is not None:
+ storage_level = cast(StorageLevel, kwargs.get("storage_level"))
+ req.persist.storage_level.CopyFrom(
+ pb2.StorageLevel(
+ use_disk=storage_level.useDisk,
+ use_memory=storage_level.useMemory,
+ use_off_heap=storage_level.useOffHeap,
+ deserialized=storage_level.deserialized,
+ replication=storage_level.replication,
+ )
+ )
+ elif method == "unpersist":
+ req.unpersist.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation")))
+ if kwargs.get("blocking", None) is not None:
+ req.unpersist.blocking = cast(bool, kwargs.get("blocking"))
+ elif method == "get_storage_level":
+ req.get_storage_level.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation")))
else:
raise ValueError(f"Unknown Analyze method: {method}")
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index f9ef561373d..2dfc8e72193 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -53,6 +53,7 @@ from pyspark.sql.dataframe import (
from pyspark.errors import PySparkTypeError, PySparkAttributeError
from pyspark.errors.exceptions.connect import SparkConnectException
from pyspark.rdd import PythonEvalType
+from pyspark.storagelevel import StorageLevel
import pyspark.sql.connect.plan as plan
from pyspark.sql.connect.group import GroupedData
from pyspark.sql.connect.readwriter import DataFrameWriter, DataFrameWriterV2
@@ -1538,14 +1539,54 @@ class DataFrame:
def rdd(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("RDD Support for Spark Connect is not implemented.")
- def unpersist(self, *args: Any, **kwargs: Any) -> None:
- raise NotImplementedError("unpersist() is not implemented.")
+ def cache(self) -> "DataFrame":
+ if self._plan is None:
+ raise Exception("Cannot cache on empty plan.")
+ relation = self._plan.plan(self._session.client)
+ self._session.client._analyze(method="persist", relation=relation)
+ return self
+
+ cache.__doc__ = PySparkDataFrame.cache.__doc__
+
+ def persist(
+ self,
+ storageLevel: StorageLevel = (StorageLevel.MEMORY_AND_DISK_DESER),
+ ) -> "DataFrame":
+ if self._plan is None:
+ raise Exception("Cannot persist on empty plan.")
+ relation = self._plan.plan(self._session.client)
+ self._session.client._analyze(
+ method="persist", relation=relation, storage_level=storageLevel
+ )
+ return self
+
+ persist.__doc__ = PySparkDataFrame.persist.__doc__
- def cache(self, *args: Any, **kwargs: Any) -> None:
- raise NotImplementedError("cache() is not implemented.")
+ @property
+ def storageLevel(self) -> StorageLevel:
+ if self._plan is None:
+ raise Exception("Cannot persist on empty plan.")
+ relation = self._plan.plan(self._session.client)
+ storage_level = self._session.client._analyze(
+ method="get_storage_level", relation=relation
+ ).storage_level
+ assert storage_level is not None
+ return storage_level
+
+ storageLevel.__doc__ = PySparkDataFrame.storageLevel.__doc__
+
+ def unpersist(self, blocking: bool = False) -> "DataFrame":
+ if self._plan is None:
+ raise Exception("Cannot unpersist on empty plan.")
+ relation = self._plan.plan(self._session.client)
+ self._session.client._analyze(method="unpersist", relation=relation, blocking=blocking)
+ return self
+
+ unpersist.__doc__ = PySparkDataFrame.unpivot.__doc__
- def persist(self, *args: Any, **kwargs: Any) -> None:
- raise NotImplementedError("persist() is not implemented.")
+ @property
+ def is_cached(self) -> bool:
+ return self.storageLevel != StorageLevel.NONE
def withWatermark(self, *args: Any, **kwargs: Any) -> None:
raise NotImplementedError("withWatermark() is not implemented.")
@@ -1577,9 +1618,6 @@ class DataFrame:
registerTempTable.__doc__ = PySparkDataFrame.registerTempTable.__doc__
- def storageLevel(self, *args: Any, **kwargs: Any) -> None:
- raise NotImplementedError("storageLevel() is not implemented.")
-
def _map_partitions(
self,
func: "PandasMapIterFunction",
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 36557344893..a5222eca045 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,12 +37,13 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
- b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
+ b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
)
_PLAN = DESCRIPTOR.message_types_by_name["Plan"]
_USERCONTEXT = DESCRIPTOR.message_types_by_name["UserContext"]
+_STORAGELEVEL = DESCRIPTOR.message_types_by_name["StorageLevel"]
_ANALYZEPLANREQUEST = DESCRIPTOR.message_types_by_name["AnalyzePlanRequest"]
_ANALYZEPLANREQUEST_SCHEMA = _ANALYZEPLANREQUEST.nested_types_by_name["Schema"]
_ANALYZEPLANREQUEST_EXPLAIN = _ANALYZEPLANREQUEST.nested_types_by_name["Explain"]
@@ -54,6 +55,9 @@ _ANALYZEPLANREQUEST_SPARKVERSION = _ANALYZEPLANREQUEST.nested_types_by_name["Spa
_ANALYZEPLANREQUEST_DDLPARSE = _ANALYZEPLANREQUEST.nested_types_by_name["DDLParse"]
_ANALYZEPLANREQUEST_SAMESEMANTICS = _ANALYZEPLANREQUEST.nested_types_by_name["SameSemantics"]
_ANALYZEPLANREQUEST_SEMANTICHASH = _ANALYZEPLANREQUEST.nested_types_by_name["SemanticHash"]
+_ANALYZEPLANREQUEST_PERSIST = _ANALYZEPLANREQUEST.nested_types_by_name["Persist"]
+_ANALYZEPLANREQUEST_UNPERSIST = _ANALYZEPLANREQUEST.nested_types_by_name["Unpersist"]
+_ANALYZEPLANREQUEST_GETSTORAGELEVEL = _ANALYZEPLANREQUEST.nested_types_by_name["GetStorageLevel"]
_ANALYZEPLANRESPONSE = DESCRIPTOR.message_types_by_name["AnalyzePlanResponse"]
_ANALYZEPLANRESPONSE_SCHEMA = _ANALYZEPLANRESPONSE.nested_types_by_name["Schema"]
_ANALYZEPLANRESPONSE_EXPLAIN = _ANALYZEPLANRESPONSE.nested_types_by_name["Explain"]
@@ -65,6 +69,9 @@ _ANALYZEPLANRESPONSE_SPARKVERSION = _ANALYZEPLANRESPONSE.nested_types_by_name["S
_ANALYZEPLANRESPONSE_DDLPARSE = _ANALYZEPLANRESPONSE.nested_types_by_name["DDLParse"]
_ANALYZEPLANRESPONSE_SAMESEMANTICS = _ANALYZEPLANRESPONSE.nested_types_by_name["SameSemantics"]
_ANALYZEPLANRESPONSE_SEMANTICHASH = _ANALYZEPLANRESPONSE.nested_types_by_name["SemanticHash"]
+_ANALYZEPLANRESPONSE_PERSIST = _ANALYZEPLANRESPONSE.nested_types_by_name["Persist"]
+_ANALYZEPLANRESPONSE_UNPERSIST = _ANALYZEPLANRESPONSE.nested_types_by_name["Unpersist"]
+_ANALYZEPLANRESPONSE_GETSTORAGELEVEL = _ANALYZEPLANRESPONSE.nested_types_by_name["GetStorageLevel"]
_EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
_EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
_EXECUTEPLANRESPONSE_SQLCOMMANDRESULT = _EXECUTEPLANRESPONSE.nested_types_by_name[
@@ -131,6 +138,17 @@ UserContext = _reflection.GeneratedProtocolMessageType(
)
_sym_db.RegisterMessage(UserContext)
+StorageLevel = _reflection.GeneratedProtocolMessageType(
+ "StorageLevel",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _STORAGELEVEL,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.StorageLevel)
+ },
+)
+_sym_db.RegisterMessage(StorageLevel)
+
AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
"AnalyzePlanRequest",
(_message.Message,),
@@ -225,6 +243,33 @@ AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SemanticHash)
},
),
+ "Persist": _reflection.GeneratedProtocolMessageType(
+ "Persist",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANREQUEST_PERSIST,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.Persist)
+ },
+ ),
+ "Unpersist": _reflection.GeneratedProtocolMessageType(
+ "Unpersist",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANREQUEST_UNPERSIST,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.Unpersist)
+ },
+ ),
+ "GetStorageLevel": _reflection.GeneratedProtocolMessageType(
+ "GetStorageLevel",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANREQUEST_GETSTORAGELEVEL,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.GetStorageLevel)
+ },
+ ),
"DESCRIPTOR": _ANALYZEPLANREQUEST,
"__module__": "spark.connect.base_pb2"
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest)
@@ -241,6 +286,9 @@ _sym_db.RegisterMessage(AnalyzePlanRequest.SparkVersion)
_sym_db.RegisterMessage(AnalyzePlanRequest.DDLParse)
_sym_db.RegisterMessage(AnalyzePlanRequest.SameSemantics)
_sym_db.RegisterMessage(AnalyzePlanRequest.SemanticHash)
+_sym_db.RegisterMessage(AnalyzePlanRequest.Persist)
+_sym_db.RegisterMessage(AnalyzePlanRequest.Unpersist)
+_sym_db.RegisterMessage(AnalyzePlanRequest.GetStorageLevel)
AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
"AnalyzePlanResponse",
@@ -336,6 +384,33 @@ AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SemanticHash)
},
),
+ "Persist": _reflection.GeneratedProtocolMessageType(
+ "Persist",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANRESPONSE_PERSIST,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.Persist)
+ },
+ ),
+ "Unpersist": _reflection.GeneratedProtocolMessageType(
+ "Unpersist",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANRESPONSE_UNPERSIST,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.Unpersist)
+ },
+ ),
+ "GetStorageLevel": _reflection.GeneratedProtocolMessageType(
+ "GetStorageLevel",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _ANALYZEPLANRESPONSE_GETSTORAGELEVEL,
+ "__module__": "spark.connect.base_pb2"
+ # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.GetStorageLevel)
+ },
+ ),
"DESCRIPTOR": _ANALYZEPLANRESPONSE,
"__module__": "spark.connect.base_pb2"
# @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse)
@@ -352,6 +427,9 @@ _sym_db.RegisterMessage(AnalyzePlanResponse.SparkVersion)
_sym_db.RegisterMessage(AnalyzePlanResponse.DDLParse)
_sym_db.RegisterMessage(AnalyzePlanResponse.SameSemantics)
_sym_db.RegisterMessage(AnalyzePlanResponse.SemanticHash)
+_sym_db.RegisterMessage(AnalyzePlanResponse.Persist)
+_sym_db.RegisterMessage(AnalyzePlanResponse.Unpersist)
+_sym_db.RegisterMessage(AnalyzePlanResponse.GetStorageLevel)
ExecutePlanRequest = _reflection.GeneratedProtocolMessageType(
"ExecutePlanRequest",
@@ -641,106 +719,120 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_PLAN._serialized_end = 307
_USERCONTEXT._serialized_start = 309
_USERCONTEXT._serialized_end = 431
- _ANALYZEPLANREQUEST._serialized_start = 434
- _ANALYZEPLANREQUEST._serialized_end = 2235
- _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1384
- _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1433
- _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1436
- _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1751
- _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1579
- _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1751
- _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1753
- _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1806
- _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1808
- _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1858
- _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1860
- _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1914
- _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1916
- _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1969
- _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1971
- _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1985
- _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1987
- _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2028
- _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2030
- _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2151
- _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2153
- _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2208
- _ANALYZEPLANRESPONSE._serialized_start = 2238
- _ANALYZEPLANRESPONSE._serialized_end = 3570
- _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 3098
- _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 3155
- _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 3157
- _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 3205
- _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 3207
- _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3252
- _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3254
- _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3290
- _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3292
- _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3340
- _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3342
- _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3376
- _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3378
- _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3418
- _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3420
- _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3479
- _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3481
- _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3520
- _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 3522
- _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 3560
- _EXECUTEPLANREQUEST._serialized_start = 3573
- _EXECUTEPLANREQUEST._serialized_end = 3782
- _EXECUTEPLANRESPONSE._serialized_start = 3785
- _EXECUTEPLANRESPONSE._serialized_end = 5060
- _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 4291
- _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4362
- _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4364
- _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4425
- _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4428
- _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4945
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4523
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4855
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4732
- _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4855
- _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4857
- _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4945
- _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4947
- _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 5043
- _KEYVALUE._serialized_start = 5062
- _KEYVALUE._serialized_end = 5127
- _CONFIGREQUEST._serialized_start = 5130
- _CONFIGREQUEST._serialized_end = 6158
- _CONFIGREQUEST_OPERATION._serialized_start = 5350
- _CONFIGREQUEST_OPERATION._serialized_end = 5848
- _CONFIGREQUEST_SET._serialized_start = 5850
- _CONFIGREQUEST_SET._serialized_end = 5902
- _CONFIGREQUEST_GET._serialized_start = 5904
- _CONFIGREQUEST_GET._serialized_end = 5929
- _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5931
- _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5994
- _CONFIGREQUEST_GETOPTION._serialized_start = 5996
- _CONFIGREQUEST_GETOPTION._serialized_end = 6027
- _CONFIGREQUEST_GETALL._serialized_start = 6029
- _CONFIGREQUEST_GETALL._serialized_end = 6077
- _CONFIGREQUEST_UNSET._serialized_start = 6079
- _CONFIGREQUEST_UNSET._serialized_end = 6106
- _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 6108
- _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 6142
- _CONFIGRESPONSE._serialized_start = 6160
- _CONFIGRESPONSE._serialized_end = 6282
- _ADDARTIFACTSREQUEST._serialized_start = 6285
- _ADDARTIFACTSREQUEST._serialized_end = 7156
- _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6672
- _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6725
- _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6727
- _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6838
- _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6840
- _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6933
- _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6936
- _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 7129
- _ADDARTIFACTSRESPONSE._serialized_start = 7159
- _ADDARTIFACTSRESPONSE._serialized_end = 7347
- _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 7266
- _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 7347
- _SPARKCONNECTSERVICE._serialized_start = 7350
- _SPARKCONNECTSERVICE._serialized_end = 7715
+ _STORAGELEVEL._serialized_start = 434
+ _STORAGELEVEL._serialized_end = 610
+ _ANALYZEPLANREQUEST._serialized_start = 613
+ _ANALYZEPLANREQUEST._serialized_end = 2997
+ _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1808
+ _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1857
+ _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1860
+ _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 2175
+ _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 2003
+ _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 2175
+ _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 2177
+ _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 2230
+ _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 2232
+ _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 2282
+ _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 2284
+ _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 2338
+ _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 2340
+ _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 2393
+ _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 2395
+ _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 2409
+ _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 2411
+ _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2452
+ _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2454
+ _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2575
+ _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2577
+ _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2632
+ _ANALYZEPLANREQUEST_PERSIST._serialized_start = 2635
+ _ANALYZEPLANREQUEST_PERSIST._serialized_end = 2786
+ _ANALYZEPLANREQUEST_UNPERSIST._serialized_start = 2788
+ _ANALYZEPLANREQUEST_UNPERSIST._serialized_end = 2898
+ _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_start = 2900
+ _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_end = 2970
+ _ANALYZEPLANRESPONSE._serialized_start = 3000
+ _ANALYZEPLANRESPONSE._serialized_end = 4689
+ _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 4108
+ _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 4165
+ _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 4167
+ _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 4215
+ _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 4217
+ _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 4262
+ _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 4264
+ _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 4300
+ _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 4302
+ _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 4350
+ _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 4352
+ _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 4386
+ _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 4388
+ _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 4428
+ _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 4430
+ _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 4489
+ _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 4491
+ _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 4530
+ _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 4532
+ _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 4570
+ _ANALYZEPLANRESPONSE_PERSIST._serialized_start = 2635
+ _ANALYZEPLANRESPONSE_PERSIST._serialized_end = 2644
+ _ANALYZEPLANRESPONSE_UNPERSIST._serialized_start = 2788
+ _ANALYZEPLANRESPONSE_UNPERSIST._serialized_end = 2799
+ _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_start = 4596
+ _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_end = 4679
+ _EXECUTEPLANREQUEST._serialized_start = 4692
+ _EXECUTEPLANREQUEST._serialized_end = 4901
+ _EXECUTEPLANRESPONSE._serialized_start = 4904
+ _EXECUTEPLANRESPONSE._serialized_end = 6179
+ _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 5410
+ _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 5481
+ _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 5483
+ _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 5544
+ _EXECUTEPLANRESPONSE_METRICS._serialized_start = 5547
+ _EXECUTEPLANRESPONSE_METRICS._serialized_end = 6064
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 5642
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 5974
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 5851
+ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 5974
+ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 5976
+ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 6064
+ _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 6066
+ _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 6162
+ _KEYVALUE._serialized_start = 6181
+ _KEYVALUE._serialized_end = 6246
+ _CONFIGREQUEST._serialized_start = 6249
+ _CONFIGREQUEST._serialized_end = 7277
+ _CONFIGREQUEST_OPERATION._serialized_start = 6469
+ _CONFIGREQUEST_OPERATION._serialized_end = 6967
+ _CONFIGREQUEST_SET._serialized_start = 6969
+ _CONFIGREQUEST_SET._serialized_end = 7021
+ _CONFIGREQUEST_GET._serialized_start = 7023
+ _CONFIGREQUEST_GET._serialized_end = 7048
+ _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 7050
+ _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 7113
+ _CONFIGREQUEST_GETOPTION._serialized_start = 7115
+ _CONFIGREQUEST_GETOPTION._serialized_end = 7146
+ _CONFIGREQUEST_GETALL._serialized_start = 7148
+ _CONFIGREQUEST_GETALL._serialized_end = 7196
+ _CONFIGREQUEST_UNSET._serialized_start = 7198
+ _CONFIGREQUEST_UNSET._serialized_end = 7225
+ _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 7227
+ _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 7261
+ _CONFIGRESPONSE._serialized_start = 7279
+ _CONFIGRESPONSE._serialized_end = 7401
+ _ADDARTIFACTSREQUEST._serialized_start = 7404
+ _ADDARTIFACTSREQUEST._serialized_end = 8275
+ _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 7791
+ _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 7844
+ _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 7846
+ _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 7957
+ _ADDARTIFACTSREQUEST_BATCH._serialized_start = 7959
+ _ADDARTIFACTSREQUEST_BATCH._serialized_end = 8052
+ _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 8055
+ _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 8248
+ _ADDARTIFACTSRESPONSE._serialized_start = 8278
+ _ADDARTIFACTSRESPONSE._serialized_end = 8466
+ _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 8385
+ _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 8466
+ _SPARKCONNECTSERVICE._serialized_start = 8469
+ _SPARKCONNECTSERVICE._serialized_end = 8834
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 4c020308d9a..8c1f9f09d61 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -132,6 +132,53 @@ class UserContext(google.protobuf.message.Message):
global___UserContext = UserContext
+class StorageLevel(google.protobuf.message.Message):
+ """StorageLevel for persisting Datasets/Tables."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ USE_DISK_FIELD_NUMBER: builtins.int
+ USE_MEMORY_FIELD_NUMBER: builtins.int
+ USE_OFF_HEAP_FIELD_NUMBER: builtins.int
+ DESERIALIZED_FIELD_NUMBER: builtins.int
+ REPLICATION_FIELD_NUMBER: builtins.int
+ use_disk: builtins.bool
+ """(Required) Whether the cache should use disk or not."""
+ use_memory: builtins.bool
+ """(Required) Whether the cache should use memory or not."""
+ use_off_heap: builtins.bool
+ """(Required) Whether the cache should use off-heap or not."""
+ deserialized: builtins.bool
+ """(Required) Whether the cached data is deserialized or not."""
+ replication: builtins.int
+ """(Required) The number of replicas."""
+ def __init__(
+ self,
+ *,
+ use_disk: builtins.bool = ...,
+ use_memory: builtins.bool = ...,
+ use_off_heap: builtins.bool = ...,
+ deserialized: builtins.bool = ...,
+ replication: builtins.int = ...,
+ ) -> None: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "deserialized",
+ b"deserialized",
+ "replication",
+ b"replication",
+ "use_disk",
+ b"use_disk",
+ "use_memory",
+ b"use_memory",
+ "use_off_heap",
+ b"use_off_heap",
+ ],
+ ) -> None: ...
+
+global___StorageLevel = StorageLevel
+
class AnalyzePlanRequest(google.protobuf.message.Message):
"""Request to perform plan analyze, optionally to explain the plan."""
@@ -367,6 +414,100 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
+ class Persist(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ RELATION_FIELD_NUMBER: builtins.int
+ STORAGE_LEVEL_FIELD_NUMBER: builtins.int
+ @property
+ def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+ """(Required) The logical plan to persist."""
+ @property
+ def storage_level(self) -> global___StorageLevel:
+ """(Optional) The storage level."""
+ def __init__(
+ self,
+ *,
+ relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+ storage_level: global___StorageLevel | None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_storage_level",
+ b"_storage_level",
+ "relation",
+ b"relation",
+ "storage_level",
+ b"storage_level",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_storage_level",
+ b"_storage_level",
+ "relation",
+ b"relation",
+ "storage_level",
+ b"storage_level",
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_storage_level", b"_storage_level"]
+ ) -> typing_extensions.Literal["storage_level"] | None: ...
+
+ class Unpersist(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ RELATION_FIELD_NUMBER: builtins.int
+ BLOCKING_FIELD_NUMBER: builtins.int
+ @property
+ def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+ """(Required) The logical plan to unpersist."""
+ blocking: builtins.bool
+ """(Optional) Whether to block until all blocks are deleted."""
+ def __init__(
+ self,
+ *,
+ relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+ blocking: builtins.bool | None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_blocking", b"_blocking", "blocking", b"blocking", "relation", b"relation"
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_blocking", b"_blocking", "blocking", b"blocking", "relation", b"relation"
+ ],
+ ) -> None: ...
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_blocking", b"_blocking"]
+ ) -> typing_extensions.Literal["blocking"] | None: ...
+
+ class GetStorageLevel(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ RELATION_FIELD_NUMBER: builtins.int
+ @property
+ def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+ """(Required) The logical plan to get the storage level."""
+ def __init__(
+ self,
+ *,
+ relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["relation", b"relation"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["relation", b"relation"]
+ ) -> None: ...
+
SESSION_ID_FIELD_NUMBER: builtins.int
USER_CONTEXT_FIELD_NUMBER: builtins.int
CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -380,6 +521,9 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
DDL_PARSE_FIELD_NUMBER: builtins.int
SAME_SEMANTICS_FIELD_NUMBER: builtins.int
SEMANTIC_HASH_FIELD_NUMBER: builtins.int
+ PERSIST_FIELD_NUMBER: builtins.int
+ UNPERSIST_FIELD_NUMBER: builtins.int
+ GET_STORAGE_LEVEL_FIELD_NUMBER: builtins.int
session_id: builtins.str
"""(Required)
@@ -415,6 +559,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
def same_semantics(self) -> global___AnalyzePlanRequest.SameSemantics: ...
@property
def semantic_hash(self) -> global___AnalyzePlanRequest.SemanticHash: ...
+ @property
+ def persist(self) -> global___AnalyzePlanRequest.Persist: ...
+ @property
+ def unpersist(self) -> global___AnalyzePlanRequest.Unpersist: ...
+ @property
+ def get_storage_level(self) -> global___AnalyzePlanRequest.GetStorageLevel: ...
def __init__(
self,
*,
@@ -431,6 +581,9 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
ddl_parse: global___AnalyzePlanRequest.DDLParse | None = ...,
same_semantics: global___AnalyzePlanRequest.SameSemantics | None = ...,
semantic_hash: global___AnalyzePlanRequest.SemanticHash | None = ...,
+ persist: global___AnalyzePlanRequest.Persist | None = ...,
+ unpersist: global___AnalyzePlanRequest.Unpersist | None = ...,
+ get_storage_level: global___AnalyzePlanRequest.GetStorageLevel | None = ...,
) -> None: ...
def HasField(
self,
@@ -445,12 +598,16 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"ddl_parse",
"explain",
b"explain",
+ "get_storage_level",
+ b"get_storage_level",
"input_files",
b"input_files",
"is_local",
b"is_local",
"is_streaming",
b"is_streaming",
+ "persist",
+ b"persist",
"same_semantics",
b"same_semantics",
"schema",
@@ -461,6 +618,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"spark_version",
"tree_string",
b"tree_string",
+ "unpersist",
+ b"unpersist",
"user_context",
b"user_context",
],
@@ -478,12 +637,16 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"ddl_parse",
"explain",
b"explain",
+ "get_storage_level",
+ b"get_storage_level",
"input_files",
b"input_files",
"is_local",
b"is_local",
"is_streaming",
b"is_streaming",
+ "persist",
+ b"persist",
"same_semantics",
b"same_semantics",
"schema",
@@ -496,6 +659,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
b"spark_version",
"tree_string",
b"tree_string",
+ "unpersist",
+ b"unpersist",
"user_context",
b"user_context",
],
@@ -518,6 +683,9 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
"ddl_parse",
"same_semantics",
"semantic_hash",
+ "persist",
+ "unpersist",
+ "get_storage_level",
] | None: ...
global___AnalyzePlanRequest = AnalyzePlanRequest
@@ -679,6 +847,39 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
self, field_name: typing_extensions.Literal["result", b"result"]
) -> None: ...
+ class Persist(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+ class Unpersist(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ def __init__(
+ self,
+ ) -> None: ...
+
+ class GetStorageLevel(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ STORAGE_LEVEL_FIELD_NUMBER: builtins.int
+ @property
+ def storage_level(self) -> global___StorageLevel:
+ """(Required) The StorageLevel as a result of get_storage_level request."""
+ def __init__(
+ self,
+ *,
+ storage_level: global___StorageLevel | None = ...,
+ ) -> None: ...
+ def HasField(
+ self, field_name: typing_extensions.Literal["storage_level", b"storage_level"]
+ ) -> builtins.bool: ...
+ def ClearField(
+ self, field_name: typing_extensions.Literal["storage_level", b"storage_level"]
+ ) -> None: ...
+
SESSION_ID_FIELD_NUMBER: builtins.int
SCHEMA_FIELD_NUMBER: builtins.int
EXPLAIN_FIELD_NUMBER: builtins.int
@@ -690,6 +891,9 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
DDL_PARSE_FIELD_NUMBER: builtins.int
SAME_SEMANTICS_FIELD_NUMBER: builtins.int
SEMANTIC_HASH_FIELD_NUMBER: builtins.int
+ PERSIST_FIELD_NUMBER: builtins.int
+ UNPERSIST_FIELD_NUMBER: builtins.int
+ GET_STORAGE_LEVEL_FIELD_NUMBER: builtins.int
session_id: builtins.str
@property
def schema(self) -> global___AnalyzePlanResponse.Schema: ...
@@ -711,6 +915,12 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
def same_semantics(self) -> global___AnalyzePlanResponse.SameSemantics: ...
@property
def semantic_hash(self) -> global___AnalyzePlanResponse.SemanticHash: ...
+ @property
+ def persist(self) -> global___AnalyzePlanResponse.Persist: ...
+ @property
+ def unpersist(self) -> global___AnalyzePlanResponse.Unpersist: ...
+ @property
+ def get_storage_level(self) -> global___AnalyzePlanResponse.GetStorageLevel: ...
def __init__(
self,
*,
@@ -725,6 +935,9 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
ddl_parse: global___AnalyzePlanResponse.DDLParse | None = ...,
same_semantics: global___AnalyzePlanResponse.SameSemantics | None = ...,
semantic_hash: global___AnalyzePlanResponse.SemanticHash | None = ...,
+ persist: global___AnalyzePlanResponse.Persist | None = ...,
+ unpersist: global___AnalyzePlanResponse.Unpersist | None = ...,
+ get_storage_level: global___AnalyzePlanResponse.GetStorageLevel | None = ...,
) -> None: ...
def HasField(
self,
@@ -733,12 +946,16 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"ddl_parse",
"explain",
b"explain",
+ "get_storage_level",
+ b"get_storage_level",
"input_files",
b"input_files",
"is_local",
b"is_local",
"is_streaming",
b"is_streaming",
+ "persist",
+ b"persist",
"result",
b"result",
"same_semantics",
@@ -751,6 +968,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"spark_version",
"tree_string",
b"tree_string",
+ "unpersist",
+ b"unpersist",
],
) -> builtins.bool: ...
def ClearField(
@@ -760,12 +979,16 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"ddl_parse",
"explain",
b"explain",
+ "get_storage_level",
+ b"get_storage_level",
"input_files",
b"input_files",
"is_local",
b"is_local",
"is_streaming",
b"is_streaming",
+ "persist",
+ b"persist",
"result",
b"result",
"same_semantics",
@@ -780,6 +1003,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
b"spark_version",
"tree_string",
b"tree_string",
+ "unpersist",
+ b"unpersist",
],
) -> None: ...
def WhichOneof(
@@ -795,6 +1020,9 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
"ddl_parse",
"same_semantics",
"semantic_hash",
+ "persist",
+ "unpersist",
+ "get_storage_level",
] | None: ...
global___AnalyzePlanResponse = AnalyzePlanResponse
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index d520aa16999..b5e6e49fb05 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1421,6 +1421,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
.. versionadded:: 1.3.0
+ .. versionchanged:: 3.4.0
+ Supports Spark Connect.
+
Notes
-----
The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.
@@ -1435,6 +1438,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
>>> df = spark.range(1)
>>> df.cache()
DataFrame[id: bigint]
+
+ >>> df.explain()
+ == Physical Plan ==
+ InMemoryTableScan ...
"""
self.is_cached = True
self._jdf.cache()
@@ -1451,6 +1458,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
.. versionadded:: 1.3.0
+ .. versionchanged:: 3.4.0
+ Supports Spark Connect.
+
Notes
-----
The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0.
@@ -1471,6 +1481,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
>>> df.persist()
DataFrame[id: bigint]
+ >>> df.explain()
+ == Physical Plan ==
+ InMemoryTableScan ...
+
Persists the data in the disk by specifying the storage level.
>>> from pyspark.storagelevel import StorageLevel
@@ -1488,6 +1502,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
.. versionadded:: 2.1.0
+ .. versionchanged:: 3.4.0
+ Supports Spark Connect.
+
Returns
-------
:class:`StorageLevel`
@@ -1521,6 +1538,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
.. versionadded:: 1.3.0
+ .. versionchanged:: 3.4.0
+ Supports Spark Connect.
+
Notes
-----
`blocking` default has changed to ``False`` to match Scala in 2.0.
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 0a47dc193c9..f911ca9ba78 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2827,9 +2827,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
df = self.connect.read.table(self.tbl_name)
for f in (
"rdd",
- "unpersist",
- "cache",
- "persist",
"withWatermark",
"foreach",
"foreachPartition",
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index cb209f472bf..0ce32ec4abf 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -43,6 +43,7 @@ from pyspark.sql.types import (
FloatType,
DayTimeIntervalType,
)
+from pyspark.storagelevel import StorageLevel
from pyspark.errors import (
AnalysisException,
IllegalArgumentException,
@@ -1029,7 +1030,30 @@ class DataFrameTestsMixin:
# works with crossJoin
self.assertEqual(1, df1.crossJoin(df2).count())
- def test_cache(self):
+ def test_cache_dataframe(self):
+ df = self.spark.createDataFrame([(2, 2), (3, 3)])
+ try:
+ self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+ df.cache()
+ self.assertEqual(df.storageLevel, StorageLevel.MEMORY_AND_DISK)
+
+ df.unpersist()
+ self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+ df.persist()
+ self.assertEqual(df.storageLevel, StorageLevel.MEMORY_AND_DISK_DESER)
+
+ df.unpersist(blocking=True)
+ self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+ df.persist(StorageLevel.DISK_ONLY)
+ self.assertEqual(df.storageLevel, StorageLevel.DISK_ONLY)
+ finally:
+ df.unpersist()
+ self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+ def test_cache_table(self):
spark = self.spark
with self.tempView("tab1", "tab2"):
spark.createDataFrame([(2, 2), (3, 3)]).createOrReplaceTempView("tab1")
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 754e0b9fac4..c9a70fabdf7 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -17,7 +17,7 @@
__all__ = ["StorageLevel"]
-from typing import ClassVar
+from typing import Any, ClassVar
class StorageLevel:
@@ -31,6 +31,7 @@ class StorageLevel:
formats.
"""
+ NONE: ClassVar["StorageLevel"]
DISK_ONLY: ClassVar["StorageLevel"]
DISK_ONLY_2: ClassVar["StorageLevel"]
DISK_ONLY_3: ClassVar["StorageLevel"]
@@ -73,7 +74,18 @@ class StorageLevel:
result += "%sx Replicated" % self.replication
return result
+ def __eq__(self, other: Any) -> bool:
+ return (
+ isinstance(other, StorageLevel)
+ and self.useMemory == other.useMemory
+ and self.useDisk == other.useDisk
+ and self.useOffHeap == other.useOffHeap
+ and self.deserialized == self.deserialized
+ and self.replication == other.replication
+ )
+
+StorageLevel.NONE = StorageLevel(False, False, False, False)
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org