You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/03/22 01:18:34 UTC

[spark] branch branch-3.4 updated: [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 5622981c885 [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel
5622981c885 is described below

commit 5622981c88542bd1508a09fd0b45bcfb0f1fc136
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Wed Mar 22 10:18:10 2023 +0900

    [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel
    
    ### What changes were proposed in this pull request?
    
    Implements `DataFrame.cache`, `persist`, `unpersist`, and `storageLevel`.
    
    ### Why are the changes needed?
    
    Missing APIs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    `DataFrame.cache`, `persist`, `unpersist`, and `storageLevel` will be available.
    
    ### How was this patch tested?
    
    Added/enabled the related tests.
    
    Closes #40510 from ueshin/issues/SPARK-42889/cache.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit b4f02248972c357cc2af6881b10565315ea15cb4)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../src/main/protobuf/spark/connect/base.proto     |  50 ++++
 .../common/StorageLevelProtoConverter.scala        |  46 ++++
 .../service/SparkConnectAnalyzeHandler.scala       |  33 ++-
 python/pyspark/sql/connect/client.py               |  36 +++
 python/pyspark/sql/connect/dataframe.py            |  56 +++-
 python/pyspark/sql/connect/proto/base_pb2.py       | 298 ++++++++++++++-------
 python/pyspark/sql/connect/proto/base_pb2.pyi      | 228 ++++++++++++++++
 python/pyspark/sql/dataframe.py                    |  20 ++
 .../sql/tests/connect/test_connect_basic.py        |   3 -
 python/pyspark/sql/tests/test_dataframe.py         |  26 +-
 python/pyspark/storagelevel.py                     |  14 +-
 11 files changed, 692 insertions(+), 118 deletions(-)

diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index da0f974a749..591f32cea1b 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -54,6 +54,20 @@ message UserContext {
   repeated google.protobuf.Any extensions = 999;
 }
 
+// StorageLevel for persisting Datasets/Tables.
+message StorageLevel {
+  // (Required) Whether the cache should use disk or not.
+  bool use_disk = 1;
+  // (Required) Whether the cache should use memory or not.
+  bool use_memory = 2;
+  // (Required) Whether the cache should use off-heap or not.
+  bool use_off_heap = 3;
+  // (Required) Whether the cached data is deserialized or not.
+  bool deserialized = 4;
+  // (Required) The number of replicas.
+  int32 replication = 5;
+}
+
 // Request to perform plan analyze, optionally to explain the plan.
 message AnalyzePlanRequest {
   // (Required)
@@ -82,6 +96,9 @@ message AnalyzePlanRequest {
     DDLParse ddl_parse = 11;
     SameSemantics same_semantics = 12;
     SemanticHash semantic_hash = 13;
+    Persist persist = 14;
+    Unpersist unpersist = 15;
+    GetStorageLevel get_storage_level = 16;
   }
 
   message Schema {
@@ -163,6 +180,27 @@ message AnalyzePlanRequest {
     // (Required) The logical plan to get a hashCode.
     Plan plan = 1;
   }
+
+  message Persist {
+    // (Required) The logical plan to persist.
+    Relation relation = 1;
+
+    // (Optional) The storage level.
+    optional StorageLevel storage_level = 2;
+  }
+
+  message Unpersist {
+    // (Required) The logical plan to unpersist.
+    Relation relation = 1;
+
+    // (Optional) Whether to block until all blocks are deleted.
+    optional bool blocking = 2;
+  }
+
+  message GetStorageLevel {
+    // (Required) The logical plan to get the storage level.
+    Relation relation = 1;
+  }
 }
 
 // Response to performing analysis of the query. Contains relevant metadata to be able to
@@ -181,6 +219,9 @@ message AnalyzePlanResponse {
     DDLParse ddl_parse = 9;
     SameSemantics same_semantics = 10;
     SemanticHash semantic_hash = 11;
+    Persist persist = 12;
+    Unpersist unpersist = 13;
+    GetStorageLevel get_storage_level = 14;
   }
 
   message Schema {
@@ -223,6 +264,15 @@ message AnalyzePlanResponse {
   message SemanticHash {
     int32 result = 1;
   }
+
+  message Persist { }
+
+  message Unpersist { }
+
+  message GetStorageLevel {
+    // (Required) The StorageLevel as a result of get_storage_level request.
+    StorageLevel storage_level = 1;
+  }
 }
 
 // A request to be executed by the service.
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala
new file mode 100644
index 00000000000..7bf273843b5
--- /dev/null
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import org.apache.spark.connect.proto
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Helper class for conversions between [[StrageLevel]] and [[proto.StorageLevel]].
+ */
+object StorageLevelProtoConverter {
+  def toStorageLevel(sl: proto.StorageLevel): StorageLevel = {
+    StorageLevel(
+      useDisk = sl.getUseDisk,
+      useMemory = sl.getUseMemory,
+      useOffHeap = sl.getUseOffHeap,
+      deserialized = sl.getDeserialized,
+      replication = sl.getReplication)
+  }
+
+  def toConnectProtoType(sl: StorageLevel): proto.StorageLevel = {
+    proto.StorageLevel
+      .newBuilder()
+      .setUseDisk(sl.useDisk)
+      .setUseMemory(sl.useMemory)
+      .setUseOffHeap(sl.useOffHeap)
+      .setDeserialized(sl.deserialized)
+      .setReplication(sl.replication)
+      .build()
+  }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
index 4697a1fd7d4..a03b827b60e 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala
@@ -24,7 +24,7 @@ import io.grpc.stub.StreamObserver
 import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, StorageLevelProtoConverter}
 import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode}
 
@@ -161,6 +161,37 @@ private[connect] class SparkConnectAnalyzeHandler(
             .newBuilder()
             .setResult(semanticHash))
 
+      case proto.AnalyzePlanRequest.AnalyzeCase.PERSIST =>
+        val target = Dataset
+          .ofRows(session, planner.transformRelation(request.getPersist.getRelation))
+        if (request.getPersist.hasStorageLevel) {
+          target.persist(
+            StorageLevelProtoConverter.toStorageLevel(request.getPersist.getStorageLevel))
+        } else {
+          target.persist()
+        }
+        builder.setPersist(proto.AnalyzePlanResponse.Persist.newBuilder().build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.UNPERSIST =>
+        val target = Dataset
+          .ofRows(session, planner.transformRelation(request.getUnpersist.getRelation))
+        if (request.getUnpersist.hasBlocking) {
+          target.unpersist(request.getUnpersist.getBlocking)
+        } else {
+          target.unpersist()
+        }
+        builder.setUnpersist(proto.AnalyzePlanResponse.Unpersist.newBuilder().build())
+
+      case proto.AnalyzePlanRequest.AnalyzeCase.GET_STORAGE_LEVEL =>
+        val target = Dataset
+          .ofRows(session, planner.transformRelation(request.getGetStorageLevel.getRelation))
+        val storageLevel = target.storageLevel
+        builder.setGetStorageLevel(
+          proto.AnalyzePlanResponse.GetStorageLevel
+            .newBuilder()
+            .setStorageLevel(StorageLevelProtoConverter.toConnectProtoType(storageLevel))
+            .build())
+
       case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!")
     }
 
diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py
index 4db852c951e..6de78e72af8 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -74,6 +74,7 @@ from pyspark.sql.connect.expressions import (
 from pyspark.sql.pandas.types import _check_series_localize_timestamps, _convert_map_items_to_dict
 from pyspark.sql.types import DataType, MapType, StructType, TimestampType
 from pyspark.rdd import PythonEvalType
+from pyspark.storagelevel import StorageLevel
 
 
 if TYPE_CHECKING:
@@ -413,6 +414,7 @@ class AnalyzeResult:
         parsed: Optional[DataType],
         is_same_semantics: Optional[bool],
         semantic_hash: Optional[int],
+        storage_level: Optional[StorageLevel],
     ):
         self.schema = schema
         self.explain_string = explain_string
@@ -424,6 +426,7 @@ class AnalyzeResult:
         self.parsed = parsed
         self.is_same_semantics = is_same_semantics
         self.semantic_hash = semantic_hash
+        self.storage_level = storage_level
 
     @classmethod
     def fromProto(cls, pb: Any) -> "AnalyzeResult":
@@ -437,6 +440,7 @@ class AnalyzeResult:
         parsed: Optional[DataType] = None
         is_same_semantics: Optional[bool] = None
         semantic_hash: Optional[int] = None
+        storage_level: Optional[StorageLevel] = None
 
         if pb.HasField("schema"):
             schema = types.proto_schema_to_pyspark_data_type(pb.schema.schema)
@@ -458,6 +462,18 @@ class AnalyzeResult:
             is_same_semantics = pb.same_semantics.result
         elif pb.HasField("semantic_hash"):
             semantic_hash = pb.semantic_hash.result
+        elif pb.HasField("persist"):
+            pass
+        elif pb.HasField("unpersist"):
+            pass
+        elif pb.HasField("get_storage_level"):
+            storage_level = StorageLevel(
+                useDisk=pb.get_storage_level.storage_level.use_disk,
+                useMemory=pb.get_storage_level.storage_level.use_memory,
+                useOffHeap=pb.get_storage_level.storage_level.use_off_heap,
+                deserialized=pb.get_storage_level.storage_level.deserialized,
+                replication=pb.get_storage_level.storage_level.replication,
+            )
         else:
             raise SparkConnectException("No analyze result found!")
 
@@ -472,6 +488,7 @@ class AnalyzeResult:
             parsed,
             is_same_semantics,
             semantic_hash,
+            storage_level,
         )
 
 
@@ -820,6 +837,25 @@ class SparkConnectClient(object):
             req.same_semantics.other_plan.CopyFrom(cast(pb2.Plan, kwargs.get("other")))
         elif method == "semantic_hash":
             req.semantic_hash.plan.CopyFrom(cast(pb2.Plan, kwargs.get("plan")))
+        elif method == "persist":
+            req.persist.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation")))
+            if kwargs.get("storage_level", None) is not None:
+                storage_level = cast(StorageLevel, kwargs.get("storage_level"))
+                req.persist.storage_level.CopyFrom(
+                    pb2.StorageLevel(
+                        use_disk=storage_level.useDisk,
+                        use_memory=storage_level.useMemory,
+                        use_off_heap=storage_level.useOffHeap,
+                        deserialized=storage_level.deserialized,
+                        replication=storage_level.replication,
+                    )
+                )
+        elif method == "unpersist":
+            req.unpersist.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation")))
+            if kwargs.get("blocking", None) is not None:
+                req.unpersist.blocking = cast(bool, kwargs.get("blocking"))
+        elif method == "get_storage_level":
+            req.get_storage_level.relation.CopyFrom(cast(pb2.Relation, kwargs.get("relation")))
         else:
             raise ValueError(f"Unknown Analyze method: {method}")
 
diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py
index f9ef561373d..2dfc8e72193 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -53,6 +53,7 @@ from pyspark.sql.dataframe import (
 from pyspark.errors import PySparkTypeError, PySparkAttributeError
 from pyspark.errors.exceptions.connect import SparkConnectException
 from pyspark.rdd import PythonEvalType
+from pyspark.storagelevel import StorageLevel
 import pyspark.sql.connect.plan as plan
 from pyspark.sql.connect.group import GroupedData
 from pyspark.sql.connect.readwriter import DataFrameWriter, DataFrameWriterV2
@@ -1538,14 +1539,54 @@ class DataFrame:
     def rdd(self, *args: Any, **kwargs: Any) -> None:
         raise NotImplementedError("RDD Support for Spark Connect is not implemented.")
 
-    def unpersist(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("unpersist() is not implemented.")
+    def cache(self) -> "DataFrame":
+        if self._plan is None:
+            raise Exception("Cannot cache on empty plan.")
+        relation = self._plan.plan(self._session.client)
+        self._session.client._analyze(method="persist", relation=relation)
+        return self
+
+    cache.__doc__ = PySparkDataFrame.cache.__doc__
+
+    def persist(
+        self,
+        storageLevel: StorageLevel = (StorageLevel.MEMORY_AND_DISK_DESER),
+    ) -> "DataFrame":
+        if self._plan is None:
+            raise Exception("Cannot persist on empty plan.")
+        relation = self._plan.plan(self._session.client)
+        self._session.client._analyze(
+            method="persist", relation=relation, storage_level=storageLevel
+        )
+        return self
+
+    persist.__doc__ = PySparkDataFrame.persist.__doc__
 
-    def cache(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("cache() is not implemented.")
+    @property
+    def storageLevel(self) -> StorageLevel:
+        if self._plan is None:
+            raise Exception("Cannot persist on empty plan.")
+        relation = self._plan.plan(self._session.client)
+        storage_level = self._session.client._analyze(
+            method="get_storage_level", relation=relation
+        ).storage_level
+        assert storage_level is not None
+        return storage_level
+
+    storageLevel.__doc__ = PySparkDataFrame.storageLevel.__doc__
+
+    def unpersist(self, blocking: bool = False) -> "DataFrame":
+        if self._plan is None:
+            raise Exception("Cannot unpersist on empty plan.")
+        relation = self._plan.plan(self._session.client)
+        self._session.client._analyze(method="unpersist", relation=relation, blocking=blocking)
+        return self
+
+    unpersist.__doc__ = PySparkDataFrame.unpivot.__doc__
 
-    def persist(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("persist() is not implemented.")
+    @property
+    def is_cached(self) -> bool:
+        return self.storageLevel != StorageLevel.NONE
 
     def withWatermark(self, *args: Any, **kwargs: Any) -> None:
         raise NotImplementedError("withWatermark() is not implemented.")
@@ -1577,9 +1618,6 @@ class DataFrame:
 
     registerTempTable.__doc__ = PySparkDataFrame.registerTempTable.__doc__
 
-    def storageLevel(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("storageLevel() is not implemented.")
-
     def _map_partitions(
         self,
         func: "PandasMapIterFunction",
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py
index 36557344893..a5222eca045 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,12 +37,13 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
+    b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06 [...]
 )
 
 
 _PLAN = DESCRIPTOR.message_types_by_name["Plan"]
 _USERCONTEXT = DESCRIPTOR.message_types_by_name["UserContext"]
+_STORAGELEVEL = DESCRIPTOR.message_types_by_name["StorageLevel"]
 _ANALYZEPLANREQUEST = DESCRIPTOR.message_types_by_name["AnalyzePlanRequest"]
 _ANALYZEPLANREQUEST_SCHEMA = _ANALYZEPLANREQUEST.nested_types_by_name["Schema"]
 _ANALYZEPLANREQUEST_EXPLAIN = _ANALYZEPLANREQUEST.nested_types_by_name["Explain"]
@@ -54,6 +55,9 @@ _ANALYZEPLANREQUEST_SPARKVERSION = _ANALYZEPLANREQUEST.nested_types_by_name["Spa
 _ANALYZEPLANREQUEST_DDLPARSE = _ANALYZEPLANREQUEST.nested_types_by_name["DDLParse"]
 _ANALYZEPLANREQUEST_SAMESEMANTICS = _ANALYZEPLANREQUEST.nested_types_by_name["SameSemantics"]
 _ANALYZEPLANREQUEST_SEMANTICHASH = _ANALYZEPLANREQUEST.nested_types_by_name["SemanticHash"]
+_ANALYZEPLANREQUEST_PERSIST = _ANALYZEPLANREQUEST.nested_types_by_name["Persist"]
+_ANALYZEPLANREQUEST_UNPERSIST = _ANALYZEPLANREQUEST.nested_types_by_name["Unpersist"]
+_ANALYZEPLANREQUEST_GETSTORAGELEVEL = _ANALYZEPLANREQUEST.nested_types_by_name["GetStorageLevel"]
 _ANALYZEPLANRESPONSE = DESCRIPTOR.message_types_by_name["AnalyzePlanResponse"]
 _ANALYZEPLANRESPONSE_SCHEMA = _ANALYZEPLANRESPONSE.nested_types_by_name["Schema"]
 _ANALYZEPLANRESPONSE_EXPLAIN = _ANALYZEPLANRESPONSE.nested_types_by_name["Explain"]
@@ -65,6 +69,9 @@ _ANALYZEPLANRESPONSE_SPARKVERSION = _ANALYZEPLANRESPONSE.nested_types_by_name["S
 _ANALYZEPLANRESPONSE_DDLPARSE = _ANALYZEPLANRESPONSE.nested_types_by_name["DDLParse"]
 _ANALYZEPLANRESPONSE_SAMESEMANTICS = _ANALYZEPLANRESPONSE.nested_types_by_name["SameSemantics"]
 _ANALYZEPLANRESPONSE_SEMANTICHASH = _ANALYZEPLANRESPONSE.nested_types_by_name["SemanticHash"]
+_ANALYZEPLANRESPONSE_PERSIST = _ANALYZEPLANRESPONSE.nested_types_by_name["Persist"]
+_ANALYZEPLANRESPONSE_UNPERSIST = _ANALYZEPLANRESPONSE.nested_types_by_name["Unpersist"]
+_ANALYZEPLANRESPONSE_GETSTORAGELEVEL = _ANALYZEPLANRESPONSE.nested_types_by_name["GetStorageLevel"]
 _EXECUTEPLANREQUEST = DESCRIPTOR.message_types_by_name["ExecutePlanRequest"]
 _EXECUTEPLANRESPONSE = DESCRIPTOR.message_types_by_name["ExecutePlanResponse"]
 _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT = _EXECUTEPLANRESPONSE.nested_types_by_name[
@@ -131,6 +138,17 @@ UserContext = _reflection.GeneratedProtocolMessageType(
 )
 _sym_db.RegisterMessage(UserContext)
 
+StorageLevel = _reflection.GeneratedProtocolMessageType(
+    "StorageLevel",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _STORAGELEVEL,
+        "__module__": "spark.connect.base_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.StorageLevel)
+    },
+)
+_sym_db.RegisterMessage(StorageLevel)
+
 AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
     "AnalyzePlanRequest",
     (_message.Message,),
@@ -225,6 +243,33 @@ AnalyzePlanRequest = _reflection.GeneratedProtocolMessageType(
                 # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.SemanticHash)
             },
         ),
+        "Persist": _reflection.GeneratedProtocolMessageType(
+            "Persist",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_PERSIST,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.Persist)
+            },
+        ),
+        "Unpersist": _reflection.GeneratedProtocolMessageType(
+            "Unpersist",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_UNPERSIST,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.Unpersist)
+            },
+        ),
+        "GetStorageLevel": _reflection.GeneratedProtocolMessageType(
+            "GetStorageLevel",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANREQUEST_GETSTORAGELEVEL,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest.GetStorageLevel)
+            },
+        ),
         "DESCRIPTOR": _ANALYZEPLANREQUEST,
         "__module__": "spark.connect.base_pb2"
         # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanRequest)
@@ -241,6 +286,9 @@ _sym_db.RegisterMessage(AnalyzePlanRequest.SparkVersion)
 _sym_db.RegisterMessage(AnalyzePlanRequest.DDLParse)
 _sym_db.RegisterMessage(AnalyzePlanRequest.SameSemantics)
 _sym_db.RegisterMessage(AnalyzePlanRequest.SemanticHash)
+_sym_db.RegisterMessage(AnalyzePlanRequest.Persist)
+_sym_db.RegisterMessage(AnalyzePlanRequest.Unpersist)
+_sym_db.RegisterMessage(AnalyzePlanRequest.GetStorageLevel)
 
 AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
     "AnalyzePlanResponse",
@@ -336,6 +384,33 @@ AnalyzePlanResponse = _reflection.GeneratedProtocolMessageType(
                 # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.SemanticHash)
             },
         ),
+        "Persist": _reflection.GeneratedProtocolMessageType(
+            "Persist",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_PERSIST,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.Persist)
+            },
+        ),
+        "Unpersist": _reflection.GeneratedProtocolMessageType(
+            "Unpersist",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_UNPERSIST,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.Unpersist)
+            },
+        ),
+        "GetStorageLevel": _reflection.GeneratedProtocolMessageType(
+            "GetStorageLevel",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _ANALYZEPLANRESPONSE_GETSTORAGELEVEL,
+                "__module__": "spark.connect.base_pb2"
+                # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse.GetStorageLevel)
+            },
+        ),
         "DESCRIPTOR": _ANALYZEPLANRESPONSE,
         "__module__": "spark.connect.base_pb2"
         # @@protoc_insertion_point(class_scope:spark.connect.AnalyzePlanResponse)
@@ -352,6 +427,9 @@ _sym_db.RegisterMessage(AnalyzePlanResponse.SparkVersion)
 _sym_db.RegisterMessage(AnalyzePlanResponse.DDLParse)
 _sym_db.RegisterMessage(AnalyzePlanResponse.SameSemantics)
 _sym_db.RegisterMessage(AnalyzePlanResponse.SemanticHash)
+_sym_db.RegisterMessage(AnalyzePlanResponse.Persist)
+_sym_db.RegisterMessage(AnalyzePlanResponse.Unpersist)
+_sym_db.RegisterMessage(AnalyzePlanResponse.GetStorageLevel)
 
 ExecutePlanRequest = _reflection.GeneratedProtocolMessageType(
     "ExecutePlanRequest",
@@ -641,106 +719,120 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _PLAN._serialized_end = 307
     _USERCONTEXT._serialized_start = 309
     _USERCONTEXT._serialized_end = 431
-    _ANALYZEPLANREQUEST._serialized_start = 434
-    _ANALYZEPLANREQUEST._serialized_end = 2235
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1384
-    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1433
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1436
-    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 1751
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 1579
-    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 1751
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 1753
-    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 1806
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 1808
-    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 1858
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 1860
-    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 1914
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 1916
-    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 1969
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 1971
-    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 1985
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 1987
-    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2028
-    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2030
-    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2151
-    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2153
-    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2208
-    _ANALYZEPLANRESPONSE._serialized_start = 2238
-    _ANALYZEPLANRESPONSE._serialized_end = 3570
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 3098
-    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 3155
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 3157
-    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 3205
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 3207
-    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 3252
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 3254
-    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 3290
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 3292
-    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 3340
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 3342
-    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 3376
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 3378
-    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 3418
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 3420
-    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 3479
-    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 3481
-    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 3520
-    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 3522
-    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 3560
-    _EXECUTEPLANREQUEST._serialized_start = 3573
-    _EXECUTEPLANREQUEST._serialized_end = 3782
-    _EXECUTEPLANRESPONSE._serialized_start = 3785
-    _EXECUTEPLANRESPONSE._serialized_end = 5060
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 4291
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 4362
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 4364
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 4425
-    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 4428
-    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 4945
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 4523
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 4855
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 4732
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 4855
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 4857
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 4945
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 4947
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 5043
-    _KEYVALUE._serialized_start = 5062
-    _KEYVALUE._serialized_end = 5127
-    _CONFIGREQUEST._serialized_start = 5130
-    _CONFIGREQUEST._serialized_end = 6158
-    _CONFIGREQUEST_OPERATION._serialized_start = 5350
-    _CONFIGREQUEST_OPERATION._serialized_end = 5848
-    _CONFIGREQUEST_SET._serialized_start = 5850
-    _CONFIGREQUEST_SET._serialized_end = 5902
-    _CONFIGREQUEST_GET._serialized_start = 5904
-    _CONFIGREQUEST_GET._serialized_end = 5929
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 5931
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 5994
-    _CONFIGREQUEST_GETOPTION._serialized_start = 5996
-    _CONFIGREQUEST_GETOPTION._serialized_end = 6027
-    _CONFIGREQUEST_GETALL._serialized_start = 6029
-    _CONFIGREQUEST_GETALL._serialized_end = 6077
-    _CONFIGREQUEST_UNSET._serialized_start = 6079
-    _CONFIGREQUEST_UNSET._serialized_end = 6106
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 6108
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 6142
-    _CONFIGRESPONSE._serialized_start = 6160
-    _CONFIGRESPONSE._serialized_end = 6282
-    _ADDARTIFACTSREQUEST._serialized_start = 6285
-    _ADDARTIFACTSREQUEST._serialized_end = 7156
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 6672
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 6725
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 6727
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 6838
-    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 6840
-    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 6933
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 6936
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 7129
-    _ADDARTIFACTSRESPONSE._serialized_start = 7159
-    _ADDARTIFACTSRESPONSE._serialized_end = 7347
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 7266
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 7347
-    _SPARKCONNECTSERVICE._serialized_start = 7350
-    _SPARKCONNECTSERVICE._serialized_end = 7715
+    _STORAGELEVEL._serialized_start = 434
+    _STORAGELEVEL._serialized_end = 610
+    _ANALYZEPLANREQUEST._serialized_start = 613
+    _ANALYZEPLANREQUEST._serialized_end = 2997
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_start = 1808
+    _ANALYZEPLANREQUEST_SCHEMA._serialized_end = 1857
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_start = 1860
+    _ANALYZEPLANREQUEST_EXPLAIN._serialized_end = 2175
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_start = 2003
+    _ANALYZEPLANREQUEST_EXPLAIN_EXPLAINMODE._serialized_end = 2175
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_start = 2177
+    _ANALYZEPLANREQUEST_TREESTRING._serialized_end = 2230
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_start = 2232
+    _ANALYZEPLANREQUEST_ISLOCAL._serialized_end = 2282
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_start = 2284
+    _ANALYZEPLANREQUEST_ISSTREAMING._serialized_end = 2338
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_start = 2340
+    _ANALYZEPLANREQUEST_INPUTFILES._serialized_end = 2393
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_start = 2395
+    _ANALYZEPLANREQUEST_SPARKVERSION._serialized_end = 2409
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_start = 2411
+    _ANALYZEPLANREQUEST_DDLPARSE._serialized_end = 2452
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_start = 2454
+    _ANALYZEPLANREQUEST_SAMESEMANTICS._serialized_end = 2575
+    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_start = 2577
+    _ANALYZEPLANREQUEST_SEMANTICHASH._serialized_end = 2632
+    _ANALYZEPLANREQUEST_PERSIST._serialized_start = 2635
+    _ANALYZEPLANREQUEST_PERSIST._serialized_end = 2786
+    _ANALYZEPLANREQUEST_UNPERSIST._serialized_start = 2788
+    _ANALYZEPLANREQUEST_UNPERSIST._serialized_end = 2898
+    _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_start = 2900
+    _ANALYZEPLANREQUEST_GETSTORAGELEVEL._serialized_end = 2970
+    _ANALYZEPLANRESPONSE._serialized_start = 3000
+    _ANALYZEPLANRESPONSE._serialized_end = 4689
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_start = 4108
+    _ANALYZEPLANRESPONSE_SCHEMA._serialized_end = 4165
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_start = 4167
+    _ANALYZEPLANRESPONSE_EXPLAIN._serialized_end = 4215
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_start = 4217
+    _ANALYZEPLANRESPONSE_TREESTRING._serialized_end = 4262
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_start = 4264
+    _ANALYZEPLANRESPONSE_ISLOCAL._serialized_end = 4300
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_start = 4302
+    _ANALYZEPLANRESPONSE_ISSTREAMING._serialized_end = 4350
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_start = 4352
+    _ANALYZEPLANRESPONSE_INPUTFILES._serialized_end = 4386
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_start = 4388
+    _ANALYZEPLANRESPONSE_SPARKVERSION._serialized_end = 4428
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_start = 4430
+    _ANALYZEPLANRESPONSE_DDLPARSE._serialized_end = 4489
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_start = 4491
+    _ANALYZEPLANRESPONSE_SAMESEMANTICS._serialized_end = 4530
+    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_start = 4532
+    _ANALYZEPLANRESPONSE_SEMANTICHASH._serialized_end = 4570
+    _ANALYZEPLANRESPONSE_PERSIST._serialized_start = 2635
+    _ANALYZEPLANRESPONSE_PERSIST._serialized_end = 2644
+    _ANALYZEPLANRESPONSE_UNPERSIST._serialized_start = 2788
+    _ANALYZEPLANRESPONSE_UNPERSIST._serialized_end = 2799
+    _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_start = 4596
+    _ANALYZEPLANRESPONSE_GETSTORAGELEVEL._serialized_end = 4679
+    _EXECUTEPLANREQUEST._serialized_start = 4692
+    _EXECUTEPLANREQUEST._serialized_end = 4901
+    _EXECUTEPLANRESPONSE._serialized_start = 4904
+    _EXECUTEPLANRESPONSE._serialized_end = 6179
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 5410
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 5481
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 5483
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 5544
+    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 5547
+    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 6064
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 5642
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 5974
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start = 5851
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 5974
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 5976
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 6064
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 6066
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 6162
+    _KEYVALUE._serialized_start = 6181
+    _KEYVALUE._serialized_end = 6246
+    _CONFIGREQUEST._serialized_start = 6249
+    _CONFIGREQUEST._serialized_end = 7277
+    _CONFIGREQUEST_OPERATION._serialized_start = 6469
+    _CONFIGREQUEST_OPERATION._serialized_end = 6967
+    _CONFIGREQUEST_SET._serialized_start = 6969
+    _CONFIGREQUEST_SET._serialized_end = 7021
+    _CONFIGREQUEST_GET._serialized_start = 7023
+    _CONFIGREQUEST_GET._serialized_end = 7048
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 7050
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 7113
+    _CONFIGREQUEST_GETOPTION._serialized_start = 7115
+    _CONFIGREQUEST_GETOPTION._serialized_end = 7146
+    _CONFIGREQUEST_GETALL._serialized_start = 7148
+    _CONFIGREQUEST_GETALL._serialized_end = 7196
+    _CONFIGREQUEST_UNSET._serialized_start = 7198
+    _CONFIGREQUEST_UNSET._serialized_end = 7225
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 7227
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 7261
+    _CONFIGRESPONSE._serialized_start = 7279
+    _CONFIGRESPONSE._serialized_end = 7401
+    _ADDARTIFACTSREQUEST._serialized_start = 7404
+    _ADDARTIFACTSREQUEST._serialized_end = 8275
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 7791
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 7844
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 7846
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 7957
+    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 7959
+    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 8052
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 8055
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 8248
+    _ADDARTIFACTSRESPONSE._serialized_start = 8278
+    _ADDARTIFACTSRESPONSE._serialized_end = 8466
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 8385
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 8466
+    _SPARKCONNECTSERVICE._serialized_start = 8469
+    _SPARKCONNECTSERVICE._serialized_end = 8834
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 4c020308d9a..8c1f9f09d61 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -132,6 +132,53 @@ class UserContext(google.protobuf.message.Message):
 
 global___UserContext = UserContext
 
+class StorageLevel(google.protobuf.message.Message):
+    """StorageLevel for persisting Datasets/Tables."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    USE_DISK_FIELD_NUMBER: builtins.int
+    USE_MEMORY_FIELD_NUMBER: builtins.int
+    USE_OFF_HEAP_FIELD_NUMBER: builtins.int
+    DESERIALIZED_FIELD_NUMBER: builtins.int
+    REPLICATION_FIELD_NUMBER: builtins.int
+    use_disk: builtins.bool
+    """(Required) Whether the cache should use disk or not."""
+    use_memory: builtins.bool
+    """(Required) Whether the cache should use memory or not."""
+    use_off_heap: builtins.bool
+    """(Required) Whether the cache should use off-heap or not."""
+    deserialized: builtins.bool
+    """(Required) Whether the cached data is deserialized or not."""
+    replication: builtins.int
+    """(Required) The number of replicas."""
+    def __init__(
+        self,
+        *,
+        use_disk: builtins.bool = ...,
+        use_memory: builtins.bool = ...,
+        use_off_heap: builtins.bool = ...,
+        deserialized: builtins.bool = ...,
+        replication: builtins.int = ...,
+    ) -> None: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "deserialized",
+            b"deserialized",
+            "replication",
+            b"replication",
+            "use_disk",
+            b"use_disk",
+            "use_memory",
+            b"use_memory",
+            "use_off_heap",
+            b"use_off_heap",
+        ],
+    ) -> None: ...
+
+global___StorageLevel = StorageLevel
+
 class AnalyzePlanRequest(google.protobuf.message.Message):
     """Request to perform plan analyze, optionally to explain the plan."""
 
@@ -367,6 +414,100 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         ) -> builtins.bool: ...
         def ClearField(self, field_name: typing_extensions.Literal["plan", b"plan"]) -> None: ...
 
+    class Persist(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        RELATION_FIELD_NUMBER: builtins.int
+        STORAGE_LEVEL_FIELD_NUMBER: builtins.int
+        @property
+        def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+            """(Required) The logical plan to persist."""
+        @property
+        def storage_level(self) -> global___StorageLevel:
+            """(Optional) The storage level."""
+        def __init__(
+            self,
+            *,
+            relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+            storage_level: global___StorageLevel | None = ...,
+        ) -> None: ...
+        def HasField(
+            self,
+            field_name: typing_extensions.Literal[
+                "_storage_level",
+                b"_storage_level",
+                "relation",
+                b"relation",
+                "storage_level",
+                b"storage_level",
+            ],
+        ) -> builtins.bool: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal[
+                "_storage_level",
+                b"_storage_level",
+                "relation",
+                b"relation",
+                "storage_level",
+                b"storage_level",
+            ],
+        ) -> None: ...
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_storage_level", b"_storage_level"]
+        ) -> typing_extensions.Literal["storage_level"] | None: ...
+
+    class Unpersist(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        RELATION_FIELD_NUMBER: builtins.int
+        BLOCKING_FIELD_NUMBER: builtins.int
+        @property
+        def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+            """(Required) The logical plan to unpersist."""
+        blocking: builtins.bool
+        """(Optional) Whether to block until all blocks are deleted."""
+        def __init__(
+            self,
+            *,
+            relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+            blocking: builtins.bool | None = ...,
+        ) -> None: ...
+        def HasField(
+            self,
+            field_name: typing_extensions.Literal[
+                "_blocking", b"_blocking", "blocking", b"blocking", "relation", b"relation"
+            ],
+        ) -> builtins.bool: ...
+        def ClearField(
+            self,
+            field_name: typing_extensions.Literal[
+                "_blocking", b"_blocking", "blocking", b"blocking", "relation", b"relation"
+            ],
+        ) -> None: ...
+        def WhichOneof(
+            self, oneof_group: typing_extensions.Literal["_blocking", b"_blocking"]
+        ) -> typing_extensions.Literal["blocking"] | None: ...
+
+    class GetStorageLevel(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        RELATION_FIELD_NUMBER: builtins.int
+        @property
+        def relation(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
+            """(Required) The logical plan to get the storage level."""
+        def __init__(
+            self,
+            *,
+            relation: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["relation", b"relation"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["relation", b"relation"]
+        ) -> None: ...
+
     SESSION_ID_FIELD_NUMBER: builtins.int
     USER_CONTEXT_FIELD_NUMBER: builtins.int
     CLIENT_TYPE_FIELD_NUMBER: builtins.int
@@ -380,6 +521,9 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     DDL_PARSE_FIELD_NUMBER: builtins.int
     SAME_SEMANTICS_FIELD_NUMBER: builtins.int
     SEMANTIC_HASH_FIELD_NUMBER: builtins.int
+    PERSIST_FIELD_NUMBER: builtins.int
+    UNPERSIST_FIELD_NUMBER: builtins.int
+    GET_STORAGE_LEVEL_FIELD_NUMBER: builtins.int
     session_id: builtins.str
     """(Required)
 
@@ -415,6 +559,12 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
     def same_semantics(self) -> global___AnalyzePlanRequest.SameSemantics: ...
     @property
     def semantic_hash(self) -> global___AnalyzePlanRequest.SemanticHash: ...
+    @property
+    def persist(self) -> global___AnalyzePlanRequest.Persist: ...
+    @property
+    def unpersist(self) -> global___AnalyzePlanRequest.Unpersist: ...
+    @property
+    def get_storage_level(self) -> global___AnalyzePlanRequest.GetStorageLevel: ...
     def __init__(
         self,
         *,
@@ -431,6 +581,9 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         ddl_parse: global___AnalyzePlanRequest.DDLParse | None = ...,
         same_semantics: global___AnalyzePlanRequest.SameSemantics | None = ...,
         semantic_hash: global___AnalyzePlanRequest.SemanticHash | None = ...,
+        persist: global___AnalyzePlanRequest.Persist | None = ...,
+        unpersist: global___AnalyzePlanRequest.Unpersist | None = ...,
+        get_storage_level: global___AnalyzePlanRequest.GetStorageLevel | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -445,12 +598,16 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"ddl_parse",
             "explain",
             b"explain",
+            "get_storage_level",
+            b"get_storage_level",
             "input_files",
             b"input_files",
             "is_local",
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "persist",
+            b"persist",
             "same_semantics",
             b"same_semantics",
             "schema",
@@ -461,6 +618,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"spark_version",
             "tree_string",
             b"tree_string",
+            "unpersist",
+            b"unpersist",
             "user_context",
             b"user_context",
         ],
@@ -478,12 +637,16 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"ddl_parse",
             "explain",
             b"explain",
+            "get_storage_level",
+            b"get_storage_level",
             "input_files",
             b"input_files",
             "is_local",
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "persist",
+            b"persist",
             "same_semantics",
             b"same_semantics",
             "schema",
@@ -496,6 +659,8 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
             b"spark_version",
             "tree_string",
             b"tree_string",
+            "unpersist",
+            b"unpersist",
             "user_context",
             b"user_context",
         ],
@@ -518,6 +683,9 @@ class AnalyzePlanRequest(google.protobuf.message.Message):
         "ddl_parse",
         "same_semantics",
         "semantic_hash",
+        "persist",
+        "unpersist",
+        "get_storage_level",
     ] | None: ...
 
 global___AnalyzePlanRequest = AnalyzePlanRequest
@@ -679,6 +847,39 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             self, field_name: typing_extensions.Literal["result", b"result"]
         ) -> None: ...
 
+    class Persist(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        def __init__(
+            self,
+        ) -> None: ...
+
+    class Unpersist(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        def __init__(
+            self,
+        ) -> None: ...
+
+    class GetStorageLevel(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        STORAGE_LEVEL_FIELD_NUMBER: builtins.int
+        @property
+        def storage_level(self) -> global___StorageLevel:
+            """(Required) The StorageLevel as a result of get_storage_level request."""
+        def __init__(
+            self,
+            *,
+            storage_level: global___StorageLevel | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["storage_level", b"storage_level"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["storage_level", b"storage_level"]
+        ) -> None: ...
+
     SESSION_ID_FIELD_NUMBER: builtins.int
     SCHEMA_FIELD_NUMBER: builtins.int
     EXPLAIN_FIELD_NUMBER: builtins.int
@@ -690,6 +891,9 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     DDL_PARSE_FIELD_NUMBER: builtins.int
     SAME_SEMANTICS_FIELD_NUMBER: builtins.int
     SEMANTIC_HASH_FIELD_NUMBER: builtins.int
+    PERSIST_FIELD_NUMBER: builtins.int
+    UNPERSIST_FIELD_NUMBER: builtins.int
+    GET_STORAGE_LEVEL_FIELD_NUMBER: builtins.int
     session_id: builtins.str
     @property
     def schema(self) -> global___AnalyzePlanResponse.Schema: ...
@@ -711,6 +915,12 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
     def same_semantics(self) -> global___AnalyzePlanResponse.SameSemantics: ...
     @property
     def semantic_hash(self) -> global___AnalyzePlanResponse.SemanticHash: ...
+    @property
+    def persist(self) -> global___AnalyzePlanResponse.Persist: ...
+    @property
+    def unpersist(self) -> global___AnalyzePlanResponse.Unpersist: ...
+    @property
+    def get_storage_level(self) -> global___AnalyzePlanResponse.GetStorageLevel: ...
     def __init__(
         self,
         *,
@@ -725,6 +935,9 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
         ddl_parse: global___AnalyzePlanResponse.DDLParse | None = ...,
         same_semantics: global___AnalyzePlanResponse.SameSemantics | None = ...,
         semantic_hash: global___AnalyzePlanResponse.SemanticHash | None = ...,
+        persist: global___AnalyzePlanResponse.Persist | None = ...,
+        unpersist: global___AnalyzePlanResponse.Unpersist | None = ...,
+        get_storage_level: global___AnalyzePlanResponse.GetStorageLevel | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -733,12 +946,16 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"ddl_parse",
             "explain",
             b"explain",
+            "get_storage_level",
+            b"get_storage_level",
             "input_files",
             b"input_files",
             "is_local",
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "persist",
+            b"persist",
             "result",
             b"result",
             "same_semantics",
@@ -751,6 +968,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"spark_version",
             "tree_string",
             b"tree_string",
+            "unpersist",
+            b"unpersist",
         ],
     ) -> builtins.bool: ...
     def ClearField(
@@ -760,12 +979,16 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"ddl_parse",
             "explain",
             b"explain",
+            "get_storage_level",
+            b"get_storage_level",
             "input_files",
             b"input_files",
             "is_local",
             b"is_local",
             "is_streaming",
             b"is_streaming",
+            "persist",
+            b"persist",
             "result",
             b"result",
             "same_semantics",
@@ -780,6 +1003,8 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
             b"spark_version",
             "tree_string",
             b"tree_string",
+            "unpersist",
+            b"unpersist",
         ],
     ) -> None: ...
     def WhichOneof(
@@ -795,6 +1020,9 @@ class AnalyzePlanResponse(google.protobuf.message.Message):
         "ddl_parse",
         "same_semantics",
         "semantic_hash",
+        "persist",
+        "unpersist",
+        "get_storage_level",
     ] | None: ...
 
 global___AnalyzePlanResponse = AnalyzePlanResponse
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index d520aa16999..b5e6e49fb05 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1421,6 +1421,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         .. versionadded:: 1.3.0
 
+        .. versionchanged:: 3.4.0
+            Supports Spark Connect.
+
         Notes
         -----
         The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.
@@ -1435,6 +1438,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         >>> df = spark.range(1)
         >>> df.cache()
         DataFrame[id: bigint]
+
+        >>> df.explain()
+        == Physical Plan ==
+        InMemoryTableScan ...
         """
         self.is_cached = True
         self._jdf.cache()
@@ -1451,6 +1458,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         .. versionadded:: 1.3.0
 
+        .. versionchanged:: 3.4.0
+            Supports Spark Connect.
+
         Notes
         -----
         The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0.
@@ -1471,6 +1481,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         >>> df.persist()
         DataFrame[id: bigint]
 
+        >>> df.explain()
+        == Physical Plan ==
+        InMemoryTableScan ...
+
         Persists the data in the disk by specifying the storage level.
 
         >>> from pyspark.storagelevel import StorageLevel
@@ -1488,6 +1502,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         .. versionadded:: 2.1.0
 
+        .. versionchanged:: 3.4.0
+            Supports Spark Connect.
+
         Returns
         -------
         :class:`StorageLevel`
@@ -1521,6 +1538,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         .. versionadded:: 1.3.0
 
+        .. versionchanged:: 3.4.0
+            Supports Spark Connect.
+
         Notes
         -----
         `blocking` default has changed to ``False`` to match Scala in 2.0.
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 0a47dc193c9..f911ca9ba78 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2827,9 +2827,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
         df = self.connect.read.table(self.tbl_name)
         for f in (
             "rdd",
-            "unpersist",
-            "cache",
-            "persist",
             "withWatermark",
             "foreach",
             "foreachPartition",
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index cb209f472bf..0ce32ec4abf 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -43,6 +43,7 @@ from pyspark.sql.types import (
     FloatType,
     DayTimeIntervalType,
 )
+from pyspark.storagelevel import StorageLevel
 from pyspark.errors import (
     AnalysisException,
     IllegalArgumentException,
@@ -1029,7 +1030,30 @@ class DataFrameTestsMixin:
             # works with crossJoin
             self.assertEqual(1, df1.crossJoin(df2).count())
 
-    def test_cache(self):
+    def test_cache_dataframe(self):
+        df = self.spark.createDataFrame([(2, 2), (3, 3)])
+        try:
+            self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+            df.cache()
+            self.assertEqual(df.storageLevel, StorageLevel.MEMORY_AND_DISK)
+
+            df.unpersist()
+            self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+            df.persist()
+            self.assertEqual(df.storageLevel, StorageLevel.MEMORY_AND_DISK_DESER)
+
+            df.unpersist(blocking=True)
+            self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+            df.persist(StorageLevel.DISK_ONLY)
+            self.assertEqual(df.storageLevel, StorageLevel.DISK_ONLY)
+        finally:
+            df.unpersist()
+            self.assertEqual(df.storageLevel, StorageLevel.NONE)
+
+    def test_cache_table(self):
         spark = self.spark
         with self.tempView("tab1", "tab2"):
             spark.createDataFrame([(2, 2), (3, 3)]).createOrReplaceTempView("tab1")
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 754e0b9fac4..c9a70fabdf7 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -17,7 +17,7 @@
 
 __all__ = ["StorageLevel"]
 
-from typing import ClassVar
+from typing import Any, ClassVar
 
 
 class StorageLevel:
@@ -31,6 +31,7 @@ class StorageLevel:
     formats.
     """
 
+    NONE: ClassVar["StorageLevel"]
     DISK_ONLY: ClassVar["StorageLevel"]
     DISK_ONLY_2: ClassVar["StorageLevel"]
     DISK_ONLY_3: ClassVar["StorageLevel"]
@@ -73,7 +74,18 @@ class StorageLevel:
         result += "%sx Replicated" % self.replication
         return result
 
+    def __eq__(self, other: Any) -> bool:
+        return (
+            isinstance(other, StorageLevel)
+            and self.useMemory == other.useMemory
+            and self.useDisk == other.useDisk
+            and self.useOffHeap == other.useOffHeap
+            and self.deserialized == self.deserialized
+            and self.replication == other.replication
+        )
+
 
+StorageLevel.NONE = StorageLevel(False, False, False, False)
 StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
 StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
 StorageLevel.DISK_ONLY_3 = StorageLevel(True, False, False, False, 3)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org