You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/02/07 06:27:38 UTC

[spark] branch branch-3.4 updated: [SPARK-41600][SPARK-41623][SPARK-41612][CONNECT] Implement Catalog.cacheTable, isCached and uncache

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 1bc7b073c04 [SPARK-41600][SPARK-41623][SPARK-41612][CONNECT] Implement Catalog.cacheTable, isCached and uncache
1bc7b073c04 is described below

commit 1bc7b073c04629ec8e950ec986a2d6f7fdb84467
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Tue Feb 7 15:27:00 2023 +0900

    [SPARK-41600][SPARK-41623][SPARK-41612][CONNECT] Implement Catalog.cacheTable, isCached and uncache
    
    ### What changes were proposed in this pull request?
    
    This PR adds three API below to Spark Connect
    - `Catalog.isCached`
    - `Catalog.cacheTable`
    - `Catalog uncacheTable`
    
    ### Why are the changes needed?
    
    These were not added because of the design concern (in its behaviour). However, we should provide the same API compatibility and behaivours with the regular PySpark in any event. So these are proposed back.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No to end users.
    Yes to the dev because it adds three new API in Spark Connect.
    
    ### How was this patch tested?
    
    Unittests were added.
    
    Closes #39919 from HyukjinKwon/SPARK-41600-SPARK-41623-SPARK-41612.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 54b5cf687b838a5ed9af43d566ef3c7320fb37d8)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../src/main/protobuf/spark/connect/catalog.proto  |  46 +++----
 .../sql/connect/planner/SparkConnectPlanner.scala  |  46 +++----
 python/pyspark/sql/catalog.py                      |  15 ++-
 python/pyspark/sql/connect/catalog.py              |  50 +++----
 python/pyspark/sql/connect/plan.py                 |  74 ++++++-----
 python/pyspark/sql/connect/proto/catalog_pb2.py    | 146 +++++++++++++--------
 python/pyspark/sql/connect/proto/catalog_pb2.pyi   | 117 ++++++++++++-----
 .../sql/tests/connect/test_connect_basic.py        |   7 +-
 .../sql/tests/connect/test_parity_catalog.py       |  14 +-
 .../sql/tests/connect/test_parity_dataframe.py     |   5 -
 python/pyspark/sql/tests/test_dataframe.py         |   8 +-
 11 files changed, 293 insertions(+), 235 deletions(-)

diff --git a/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto b/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
index daec3665fc5..b49be901526 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
@@ -44,12 +44,9 @@ message Catalog {
     DropTempView drop_temp_view = 15;
     DropGlobalTempView drop_global_temp_view = 16;
     RecoverPartitions recover_partitions = 17;
-    // TODO(SPARK-41612): Support Catalog.isCached
-    // IsCached is_cached = 18;
-    // TODO(SPARK-41600): Support Catalog.cacheTable
-    // CacheTable cache_table = 19;
-    // TODO(SPARK-41623): Support Catalog.uncacheTable
-    // UncacheTable uncache_table = 20;
+    IsCached is_cached = 18;
+    CacheTable cache_table = 19;
+    UncacheTable uncache_table = 20;
     ClearCache clear_cache = 21;
     RefreshTable refresh_table = 22;
     RefreshByPath refresh_by_path = 23;
@@ -185,26 +182,23 @@ message RecoverPartitions {
   string table_name = 1;
 }
 
-// TODO(SPARK-41612): Support Catalog.isCached
-//// See `spark.catalog.isCached`
-//message IsCached {
-//  // (Required)
-//  string table_name = 1;
-//}
-//
-// TODO(SPARK-41600): Support Catalog.cacheTable
-//// See `spark.catalog.cacheTable`
-//message CacheTable {
-//  // (Required)
-//  string table_name = 1;
-//}
-//
-// TODO(SPARK-41623): Support Catalog.uncacheTable
-//// See `spark.catalog.uncacheTable`
-//message UncacheTable {
-//  // (Required)
-//  string table_name = 1;
-//}
+// See `spark.catalog.isCached`
+message IsCached {
+  // (Required)
+  string table_name = 1;
+}
+
+// See `spark.catalog.cacheTable`
+message CacheTable {
+  // (Required)
+  string table_name = 1;
+}
+
+// See `spark.catalog.uncacheTable`
+message UncacheTable {
+  // (Required)
+  string table_name = 1;
+}
 
 // See `spark.catalog.clearCache`
 message ClearCache { }
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 28071acf8a4..07a5e5bc156 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -166,13 +166,10 @@ class SparkConnectPlanner(val session: SparkSession) {
         transformDropGlobalTempView(catalog.getDropGlobalTempView)
       case proto.Catalog.CatTypeCase.RECOVER_PARTITIONS =>
         transformRecoverPartitions(catalog.getRecoverPartitions)
-      // TODO(SPARK-41612): Support Catalog.isCached
-      // case proto.Catalog.CatTypeCase.IS_CACHED => transformIsCached(catalog.getIsCached)
-      // TODO(SPARK-41600): Support Catalog.cacheTable
-      // case proto.Catalog.CatTypeCase.CACHE_TABLE => transformCacheTable(catalog.getCacheTable)
-      // TODO(SPARK-41623): Support Catalog.uncacheTable
-      // case proto.Catalog.CatTypeCase.UNCACHE_TABLE =>
-      //   transformUncacheTable(catalog.getUncacheTable)
+      case proto.Catalog.CatTypeCase.IS_CACHED => transformIsCached(catalog.getIsCached)
+      case proto.Catalog.CatTypeCase.CACHE_TABLE => transformCacheTable(catalog.getCacheTable)
+      case proto.Catalog.CatTypeCase.UNCACHE_TABLE =>
+        transformUncacheTable(catalog.getUncacheTable)
       case proto.Catalog.CatTypeCase.CLEAR_CACHE => transformClearCache(catalog.getClearCache)
       case proto.Catalog.CatTypeCase.REFRESH_TABLE =>
         transformRefreshTable(catalog.getRefreshTable)
@@ -1791,25 +1788,22 @@ class SparkConnectPlanner(val session: SparkSession) {
     emptyLocalRelation
   }
 
-// TODO(SPARK-41612): Support Catalog.isCached
-//  private def transformIsCached(getIsCached: proto.IsCached): LogicalPlan = {
-//    session
-//      .createDataset(session.catalog.isCached(getIsCached.getTableName) :: Nil)(
-//        Encoders.scalaBoolean)
-//      .logicalPlan
-//  }
-//
-// TODO(SPARK-41600): Support Catalog.cacheTable
-//  private def transformCacheTable(getCacheTable: proto.CacheTable): LogicalPlan = {
-//    session.catalog.cacheTable(getCacheTable.getTableName)
-//    emptyLocalRelation
-//  }
-//
-// TODO(SPARK-41623): Support Catalog.uncacheTable
-//  private def transformUncacheTable(getUncacheTable: proto.UncacheTable): LogicalPlan = {
-//    session.catalog.uncacheTable(getUncacheTable.getTableName)
-//    emptyLocalRelation
-//  }
+  private def transformIsCached(getIsCached: proto.IsCached): LogicalPlan = {
+    session
+      .createDataset(session.catalog.isCached(getIsCached.getTableName) :: Nil)(
+        Encoders.scalaBoolean)
+      .logicalPlan
+  }
+
+  private def transformCacheTable(getCacheTable: proto.CacheTable): LogicalPlan = {
+    session.catalog.cacheTable(getCacheTable.getTableName)
+    emptyLocalRelation
+  }
+
+  private def transformUncacheTable(getUncacheTable: proto.UncacheTable): LogicalPlan = {
+    session.catalog.uncacheTable(getUncacheTable.getTableName)
+    emptyLocalRelation
+  }
 
   private def transformClearCache(getClearCache: proto.ClearCache): LogicalPlan = {
     session.catalog.clearCache()
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index c8f7e115e06..a417f754a36 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -934,6 +934,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -956,7 +959,7 @@ class Catalog:
 
         Throw an analysis exception when the table does not exist.
 
-        >>> spark.catalog.isCached("not_existing_table")
+        >>> spark.catalog.isCached("not_existing_table")  # doctest: +SKIP
         Traceback (most recent call last):
             ...
         AnalysisException: ...
@@ -975,6 +978,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -991,7 +997,7 @@ class Catalog:
 
         Throw an analysis exception when the table does not exist.
 
-        >>> spark.catalog.cacheTable("not_existing_table")
+        >>> spark.catalog.cacheTable("not_existing_table")  # doctest: +SKIP
         Traceback (most recent call last):
             ...
         AnalysisException: ...
@@ -1009,6 +1015,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -1028,7 +1037,7 @@ class Catalog:
 
         Throw an analysis exception when the table does not exist.
 
-        >>> spark.catalog.uncacheTable("not_existing_table")  # doctest: +IGNORE_EXCEPTION_DETAIL
+        >>> spark.catalog.uncacheTable("not_existing_table")  # doctest: +SKIP
         Traceback (most recent call last):
             ...
         AnalysisException: ...
diff --git a/python/pyspark/sql/connect/catalog.py b/python/pyspark/sql/connect/catalog.py
index 4619cfec716..753b00755ad 100644
--- a/python/pyspark/sql/connect/catalog.py
+++ b/python/pyspark/sql/connect/catalog.py
@@ -270,25 +270,22 @@ class Catalog:
 
     dropGlobalTempView.__doc__ = PySparkCatalog.dropGlobalTempView.__doc__
 
-    # TODO(SPARK-41612): Support Catalog.isCached
-    # def isCached(self, tableName: str) -> bool:
-    #     pdf = self._catalog_to_pandas(plan.IsCached(table_name=tableName))
-    #     assert pdf is not None
-    #     return pdf.iloc[0].iloc[0]
-    #
-    # isCached.__doc__ = PySparkCatalog.isCached.__doc__
-    #
-    # TODO(SPARK-41600): Support Catalog.cacheTable
-    # def cacheTable(self, tableName: str) -> None:
-    #     self._catalog_to_pandas(plan.CacheTable(table_name=tableName))
-    #
-    # cacheTable.__doc__ = PySparkCatalog.cacheTable.__doc__
-    #
-    # TODO(SPARK-41623): Support Catalog.uncacheTable
-    # def uncacheTable(self, tableName: str) -> None:
-    #     self._catalog_to_pandas(plan.UncacheTable(table_name=tableName))
-    #
-    # uncacheTable.__doc__ = PySparkCatalog.uncacheTable.__doc__
+    def isCached(self, tableName: str) -> bool:
+        pdf = self._catalog_to_pandas(plan.IsCached(table_name=tableName))
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    isCached.__doc__ = PySparkCatalog.isCached.__doc__
+
+    def cacheTable(self, tableName: str) -> None:
+        self._catalog_to_pandas(plan.CacheTable(table_name=tableName))
+
+    cacheTable.__doc__ = PySparkCatalog.cacheTable.__doc__
+
+    def uncacheTable(self, tableName: str) -> None:
+        self._catalog_to_pandas(plan.UncacheTable(table_name=tableName))
+
+    uncacheTable.__doc__ = PySparkCatalog.uncacheTable.__doc__
 
     def clearCache(self) -> None:
         self._catalog_to_pandas(plan.ClearCache())
@@ -310,15 +307,6 @@ class Catalog:
 
     refreshByPath.__doc__ = PySparkCatalog.refreshByPath.__doc__
 
-    def isCached(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("isCached() is not implemented.")
-
-    def cacheTable(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("cacheTable() is not implemented.")
-
-    def uncacheTable(self, *args: Any, **kwargs: Any) -> None:
-        raise NotImplementedError("uncacheTable() is not implemented.")
-
     def registerFunction(self, *args: Any, **kwargs: Any) -> None:
         raise NotImplementedError("registerFunction() is not implemented.")
 
@@ -337,11 +325,7 @@ def _test() -> None:
         PySparkSession.builder.appName("sql.connect.catalog tests").remote("local[4]").getOrCreate()
     )
 
-    # TODO(SPARK-41612): Support Catalog.isCached
-    # TODO(SPARK-41600): Support Catalog.cacheTable
-    del pyspark.sql.connect.catalog.Catalog.clearCache.__doc__
-    del pyspark.sql.connect.catalog.Catalog.refreshTable.__doc__
-    del pyspark.sql.connect.catalog.Catalog.refreshByPath.__doc__
+    # TODO(SPARK-41818): java.lang.ClassNotFoundException) .DefaultSource
     del pyspark.sql.connect.catalog.Catalog.recoverPartitions.__doc__
 
     (failure_count, test_count) = doctest.testmod(
diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py
index 0945adf6d20..d1d41b6a690 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -1729,45 +1729,47 @@ class RecoverPartitions(LogicalPlan):
         self._table_name = table_name
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
-        plan = proto.Relation(catalog=proto.Catalog(recover_partitions=proto.RecoverPartitions()))
-        plan.catalog.recover_partitions.table_name = self._table_name
+        plan = proto.Relation(
+            catalog=proto.Catalog(
+                recover_partitions=proto.RecoverPartitions(table_name=self._table_name)
+            )
+        )
         return plan
 
 
-# TODO(SPARK-41612): Support Catalog.isCached
-# class IsCached(LogicalPlan):
-#     def __init__(self, table_name: str) -> None:
-#         super().__init__(None)
-#         self._table_name = table_name
-#
-#     def plan(self, session: "SparkConnectClient") -> proto.Relation:
-#         plan = proto.Relation(catalog=proto.Catalog(is_cached=proto.IsCached()))
-#         plan.catalog.is_cached.table_name = self._table_name
-#         return plan
-#
-#
-# TODO(SPARK-41600): Support Catalog.cacheTable
-# class CacheTable(LogicalPlan):
-#     def __init__(self, table_name: str) -> None:
-#         super().__init__(None)
-#         self._table_name = table_name
-#
-#     def plan(self, session: "SparkConnectClient") -> proto.Relation:
-#         plan = proto.Relation(catalog=proto.Catalog(cache_table=proto.CacheTable()))
-#         plan.catalog.cache_table.table_name = self._table_name
-#         return plan
-#
-#
-# TODO(SPARK-41623): Support Catalog.uncacheTable
-# class UncacheTable(LogicalPlan):
-#     def __init__(self, table_name: str) -> None:
-#         super().__init__(None)
-#         self._table_name = table_name
-#
-#     def plan(self, session: "SparkConnectClient") -> proto.Relation:
-#         plan = proto.Relation(catalog=proto.Catalog(uncache_table=proto.UncacheTable()))
-#         plan.catalog.uncache_table.table_name = self._table_name
-#         return plan
+class IsCached(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation(
+            catalog=proto.Catalog(is_cached=proto.IsCached(table_name=self._table_name))
+        )
+        return plan
+
+
+class CacheTable(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation(
+            catalog=proto.Catalog(cache_table=proto.CacheTable(table_name=self._table_name))
+        )
+        return plan
+
+
+class UncacheTable(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation(catalog=proto.Catalog(uncache_table=proto.UncacheTable()))
+        plan.catalog.uncache_table.table_name = self._table_name
+        return plan
 
 
 class ClearCache(LogicalPlan):
diff --git a/python/pyspark/sql/connect/proto/catalog_pb2.py b/python/pyspark/sql/connect/proto/catalog_pb2.py
index ea194c0caa1..06a88b45dd1 100644
--- a/python/pyspark/sql/connect/proto/catalog_pb2.py
+++ b/python/pyspark/sql/connect/proto/catalog_pb2.py
@@ -33,7 +33,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    b'\n\x1bspark/connect/catalog.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"\x8c\r\n\x07\x43\x61talog\x12K\n\x10\x63urrent_database\x18\x01 \x01(\x0b\x32\x1e.spark.connect.CurrentDatabaseH\x00R\x0f\x63urrentDatabase\x12U\n\x14set_current_database\x18\x02 \x01(\x0b\x32!.spark.connect.SetCurrentDatabaseH\x00R\x12setCurrentDatabase\x12\x45\n\x0elist_databases\x18\x03 \x01(\x0b\x32\x1c.spark.connect.ListDatabasesH\x00R\rlistDatabases\x12<\n\x0blist_tables\x18\x04 \x01(\x0b\x3 [...]
+    b'\n\x1bspark/connect/catalog.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"\xc6\x0e\n\x07\x43\x61talog\x12K\n\x10\x63urrent_database\x18\x01 \x01(\x0b\x32\x1e.spark.connect.CurrentDatabaseH\x00R\x0f\x63urrentDatabase\x12U\n\x14set_current_database\x18\x02 \x01(\x0b\x32!.spark.connect.SetCurrentDatabaseH\x00R\x12setCurrentDatabase\x12\x45\n\x0elist_databases\x18\x03 \x01(\x0b\x32\x1c.spark.connect.ListDatabasesH\x00R\rlistDatabases\x12<\n\x0blist_tables\x18\x04 \x01(\x0b\ [...]
 )
 
 
@@ -57,6 +57,9 @@ _CREATETABLE_OPTIONSENTRY = _CREATETABLE.nested_types_by_name["OptionsEntry"]
 _DROPTEMPVIEW = DESCRIPTOR.message_types_by_name["DropTempView"]
 _DROPGLOBALTEMPVIEW = DESCRIPTOR.message_types_by_name["DropGlobalTempView"]
 _RECOVERPARTITIONS = DESCRIPTOR.message_types_by_name["RecoverPartitions"]
+_ISCACHED = DESCRIPTOR.message_types_by_name["IsCached"]
+_CACHETABLE = DESCRIPTOR.message_types_by_name["CacheTable"]
+_UNCACHETABLE = DESCRIPTOR.message_types_by_name["UncacheTable"]
 _CLEARCACHE = DESCRIPTOR.message_types_by_name["ClearCache"]
 _REFRESHTABLE = DESCRIPTOR.message_types_by_name["RefreshTable"]
 _REFRESHBYPATH = DESCRIPTOR.message_types_by_name["RefreshByPath"]
@@ -281,6 +284,39 @@ RecoverPartitions = _reflection.GeneratedProtocolMessageType(
 )
 _sym_db.RegisterMessage(RecoverPartitions)
 
+IsCached = _reflection.GeneratedProtocolMessageType(
+    "IsCached",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _ISCACHED,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.IsCached)
+    },
+)
+_sym_db.RegisterMessage(IsCached)
+
+CacheTable = _reflection.GeneratedProtocolMessageType(
+    "CacheTable",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _CACHETABLE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.CacheTable)
+    },
+)
+_sym_db.RegisterMessage(CacheTable)
+
+UncacheTable = _reflection.GeneratedProtocolMessageType(
+    "UncacheTable",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _UNCACHETABLE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.UncacheTable)
+    },
+)
+_sym_db.RegisterMessage(UncacheTable)
+
 ClearCache = _reflection.GeneratedProtocolMessageType(
     "ClearCache",
     (_message.Message,),
@@ -356,55 +392,61 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _CREATETABLE_OPTIONSENTRY._options = None
     _CREATETABLE_OPTIONSENTRY._serialized_options = b"8\001"
     _CATALOG._serialized_start = 74
-    _CATALOG._serialized_end = 1750
-    _CURRENTDATABASE._serialized_start = 1752
-    _CURRENTDATABASE._serialized_end = 1769
-    _SETCURRENTDATABASE._serialized_start = 1771
-    _SETCURRENTDATABASE._serialized_end = 1816
-    _LISTDATABASES._serialized_start = 1818
-    _LISTDATABASES._serialized_end = 1833
-    _LISTTABLES._serialized_start = 1835
-    _LISTTABLES._serialized_end = 1889
-    _LISTFUNCTIONS._serialized_start = 1891
-    _LISTFUNCTIONS._serialized_end = 1948
-    _LISTCOLUMNS._serialized_start = 1950
-    _LISTCOLUMNS._serialized_end = 2036
-    _GETDATABASE._serialized_start = 2038
-    _GETDATABASE._serialized_end = 2076
-    _GETTABLE._serialized_start = 2078
-    _GETTABLE._serialized_end = 2161
-    _GETFUNCTION._serialized_start = 2163
-    _GETFUNCTION._serialized_end = 2255
-    _DATABASEEXISTS._serialized_start = 2257
-    _DATABASEEXISTS._serialized_end = 2298
-    _TABLEEXISTS._serialized_start = 2300
-    _TABLEEXISTS._serialized_end = 2386
-    _FUNCTIONEXISTS._serialized_start = 2388
-    _FUNCTIONEXISTS._serialized_end = 2483
-    _CREATEEXTERNALTABLE._serialized_start = 2486
-    _CREATEEXTERNALTABLE._serialized_end = 2812
-    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_start = 2723
-    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_end = 2781
-    _CREATETABLE._serialized_start = 2815
-    _CREATETABLE._serialized_end = 3180
-    _CREATETABLE_OPTIONSENTRY._serialized_start = 2723
-    _CREATETABLE_OPTIONSENTRY._serialized_end = 2781
-    _DROPTEMPVIEW._serialized_start = 3182
-    _DROPTEMPVIEW._serialized_end = 3225
-    _DROPGLOBALTEMPVIEW._serialized_start = 3227
-    _DROPGLOBALTEMPVIEW._serialized_end = 3276
-    _RECOVERPARTITIONS._serialized_start = 3278
-    _RECOVERPARTITIONS._serialized_end = 3328
-    _CLEARCACHE._serialized_start = 3330
-    _CLEARCACHE._serialized_end = 3342
-    _REFRESHTABLE._serialized_start = 3344
-    _REFRESHTABLE._serialized_end = 3389
-    _REFRESHBYPATH._serialized_start = 3391
-    _REFRESHBYPATH._serialized_end = 3426
-    _CURRENTCATALOG._serialized_start = 3428
-    _CURRENTCATALOG._serialized_end = 3444
-    _SETCURRENTCATALOG._serialized_start = 3446
-    _SETCURRENTCATALOG._serialized_end = 3500
-    _LISTCATALOGS._serialized_start = 3502
-    _LISTCATALOGS._serialized_end = 3516
+    _CATALOG._serialized_end = 1936
+    _CURRENTDATABASE._serialized_start = 1938
+    _CURRENTDATABASE._serialized_end = 1955
+    _SETCURRENTDATABASE._serialized_start = 1957
+    _SETCURRENTDATABASE._serialized_end = 2002
+    _LISTDATABASES._serialized_start = 2004
+    _LISTDATABASES._serialized_end = 2019
+    _LISTTABLES._serialized_start = 2021
+    _LISTTABLES._serialized_end = 2075
+    _LISTFUNCTIONS._serialized_start = 2077
+    _LISTFUNCTIONS._serialized_end = 2134
+    _LISTCOLUMNS._serialized_start = 2136
+    _LISTCOLUMNS._serialized_end = 2222
+    _GETDATABASE._serialized_start = 2224
+    _GETDATABASE._serialized_end = 2262
+    _GETTABLE._serialized_start = 2264
+    _GETTABLE._serialized_end = 2347
+    _GETFUNCTION._serialized_start = 2349
+    _GETFUNCTION._serialized_end = 2441
+    _DATABASEEXISTS._serialized_start = 2443
+    _DATABASEEXISTS._serialized_end = 2484
+    _TABLEEXISTS._serialized_start = 2486
+    _TABLEEXISTS._serialized_end = 2572
+    _FUNCTIONEXISTS._serialized_start = 2574
+    _FUNCTIONEXISTS._serialized_end = 2669
+    _CREATEEXTERNALTABLE._serialized_start = 2672
+    _CREATEEXTERNALTABLE._serialized_end = 2998
+    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_start = 2909
+    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_end = 2967
+    _CREATETABLE._serialized_start = 3001
+    _CREATETABLE._serialized_end = 3366
+    _CREATETABLE_OPTIONSENTRY._serialized_start = 2909
+    _CREATETABLE_OPTIONSENTRY._serialized_end = 2967
+    _DROPTEMPVIEW._serialized_start = 3368
+    _DROPTEMPVIEW._serialized_end = 3411
+    _DROPGLOBALTEMPVIEW._serialized_start = 3413
+    _DROPGLOBALTEMPVIEW._serialized_end = 3462
+    _RECOVERPARTITIONS._serialized_start = 3464
+    _RECOVERPARTITIONS._serialized_end = 3514
+    _ISCACHED._serialized_start = 3516
+    _ISCACHED._serialized_end = 3557
+    _CACHETABLE._serialized_start = 3559
+    _CACHETABLE._serialized_end = 3602
+    _UNCACHETABLE._serialized_start = 3604
+    _UNCACHETABLE._serialized_end = 3649
+    _CLEARCACHE._serialized_start = 3651
+    _CLEARCACHE._serialized_end = 3663
+    _REFRESHTABLE._serialized_start = 3665
+    _REFRESHTABLE._serialized_end = 3710
+    _REFRESHBYPATH._serialized_start = 3712
+    _REFRESHBYPATH._serialized_end = 3747
+    _CURRENTCATALOG._serialized_start = 3749
+    _CURRENTCATALOG._serialized_end = 3765
+    _SETCURRENTCATALOG._serialized_start = 3767
+    _SETCURRENTCATALOG._serialized_end = 3821
+    _LISTCATALOGS._serialized_start = 3823
+    _LISTCATALOGS._serialized_end = 3837
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/catalog_pb2.pyi b/python/pyspark/sql/connect/proto/catalog_pb2.pyi
index 2d81a7fb97e..7df65c36413 100644
--- a/python/pyspark/sql/connect/proto/catalog_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/catalog_pb2.pyi
@@ -71,6 +71,9 @@ class Catalog(google.protobuf.message.Message):
     DROP_TEMP_VIEW_FIELD_NUMBER: builtins.int
     DROP_GLOBAL_TEMP_VIEW_FIELD_NUMBER: builtins.int
     RECOVER_PARTITIONS_FIELD_NUMBER: builtins.int
+    IS_CACHED_FIELD_NUMBER: builtins.int
+    CACHE_TABLE_FIELD_NUMBER: builtins.int
+    UNCACHE_TABLE_FIELD_NUMBER: builtins.int
     CLEAR_CACHE_FIELD_NUMBER: builtins.int
     REFRESH_TABLE_FIELD_NUMBER: builtins.int
     REFRESH_BY_PATH_FIELD_NUMBER: builtins.int
@@ -112,14 +115,13 @@ class Catalog(google.protobuf.message.Message):
     @property
     def recover_partitions(self) -> global___RecoverPartitions: ...
     @property
-    def clear_cache(self) -> global___ClearCache:
-        """TODO(SPARK-41612): Support Catalog.isCached
-        IsCached is_cached = 18;
-        TODO(SPARK-41600): Support Catalog.cacheTable
-        CacheTable cache_table = 19;
-        TODO(SPARK-41623): Support Catalog.uncacheTable
-        UncacheTable uncache_table = 20;
-        """
+    def is_cached(self) -> global___IsCached: ...
+    @property
+    def cache_table(self) -> global___CacheTable: ...
+    @property
+    def uncache_table(self) -> global___UncacheTable: ...
+    @property
+    def clear_cache(self) -> global___ClearCache: ...
     @property
     def refresh_table(self) -> global___RefreshTable: ...
     @property
@@ -150,6 +152,9 @@ class Catalog(google.protobuf.message.Message):
         drop_temp_view: global___DropTempView | None = ...,
         drop_global_temp_view: global___DropGlobalTempView | None = ...,
         recover_partitions: global___RecoverPartitions | None = ...,
+        is_cached: global___IsCached | None = ...,
+        cache_table: global___CacheTable | None = ...,
+        uncache_table: global___UncacheTable | None = ...,
         clear_cache: global___ClearCache | None = ...,
         refresh_table: global___RefreshTable | None = ...,
         refresh_by_path: global___RefreshByPath | None = ...,
@@ -160,6 +165,8 @@ class Catalog(google.protobuf.message.Message):
     def HasField(
         self,
         field_name: typing_extensions.Literal[
+            "cache_table",
+            b"cache_table",
             "cat_type",
             b"cat_type",
             "clear_cache",
@@ -186,6 +193,8 @@ class Catalog(google.protobuf.message.Message):
             b"get_function",
             "get_table",
             b"get_table",
+            "is_cached",
+            b"is_cached",
             "list_catalogs",
             b"list_catalogs",
             "list_columns",
@@ -208,11 +217,15 @@ class Catalog(google.protobuf.message.Message):
             b"set_current_database",
             "table_exists",
             b"table_exists",
+            "uncache_table",
+            b"uncache_table",
         ],
     ) -> builtins.bool: ...
     def ClearField(
         self,
         field_name: typing_extensions.Literal[
+            "cache_table",
+            b"cache_table",
             "cat_type",
             b"cat_type",
             "clear_cache",
@@ -239,6 +252,8 @@ class Catalog(google.protobuf.message.Message):
             b"get_function",
             "get_table",
             b"get_table",
+            "is_cached",
+            b"is_cached",
             "list_catalogs",
             b"list_catalogs",
             "list_columns",
@@ -261,6 +276,8 @@ class Catalog(google.protobuf.message.Message):
             b"set_current_database",
             "table_exists",
             b"table_exists",
+            "uncache_table",
+            b"uncache_table",
         ],
     ) -> None: ...
     def WhichOneof(
@@ -283,6 +300,9 @@ class Catalog(google.protobuf.message.Message):
         "drop_temp_view",
         "drop_global_temp_view",
         "recover_partitions",
+        "is_cached",
+        "cache_table",
+        "uncache_table",
         "clear_cache",
         "refresh_table",
         "refresh_by_path",
@@ -855,30 +875,65 @@ class RecoverPartitions(google.protobuf.message.Message):
 
 global___RecoverPartitions = RecoverPartitions
 
+class IsCached(google.protobuf.message.Message):
+    """See `spark.catalog.isCached`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["table_name", b"table_name"]
+    ) -> None: ...
+
+global___IsCached = IsCached
+
+class CacheTable(google.protobuf.message.Message):
+    """See `spark.catalog.cacheTable`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["table_name", b"table_name"]
+    ) -> None: ...
+
+global___CacheTable = CacheTable
+
+class UncacheTable(google.protobuf.message.Message):
+    """See `spark.catalog.uncacheTable`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["table_name", b"table_name"]
+    ) -> None: ...
+
+global___UncacheTable = UncacheTable
+
 class ClearCache(google.protobuf.message.Message):
-    """TODO(SPARK-41612): Support Catalog.isCached
-    // See `spark.catalog.isCached`
-    message IsCached {
-     // (Required)
-     string table_name = 1;
-    }
-
-    TODO(SPARK-41600): Support Catalog.cacheTable
-    // See `spark.catalog.cacheTable`
-    message CacheTable {
-     // (Required)
-     string table_name = 1;
-    }
-
-    TODO(SPARK-41623): Support Catalog.uncacheTable
-    // See `spark.catalog.uncacheTable`
-    message UncacheTable {
-     // (Required)
-     string table_name = 1;
-    }
-
-    See `spark.catalog.clearCache`
-    """
+    """See `spark.catalog.clearCache`"""
 
     DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 4957710524a..63b58dc1b56 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -2697,12 +2697,7 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
     def test_unsupported_catalog_functions(self):
         # SPARK-41939: Disable unsupported functions.
 
-        for f in (
-            "isCached",
-            "cacheTable",
-            "uncacheTable",
-            "registerFunction",
-        ):
+        for f in ("registerFunction",):
             with self.assertRaises(NotImplementedError):
                 getattr(self.connect.catalog, f)()
 
diff --git a/python/pyspark/sql/tests/connect/test_parity_catalog.py b/python/pyspark/sql/tests/connect/test_parity_catalog.py
index 3da702198ac..2b8a9d7383a 100644
--- a/python/pyspark/sql/tests/connect/test_parity_catalog.py
+++ b/python/pyspark/sql/tests/connect/test_parity_catalog.py
@@ -15,24 +15,12 @@
 # limitations under the License.
 #
 
-import unittest
-
 from pyspark.sql.tests.test_catalog import CatalogTestsMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
 class CatalogParityTests(CatalogTestsMixin, ReusedConnectTestCase):
-    # TODO(SPARK-41612): Support Catalog.isCached
-    # TODO(SPARK-41600): Support Catalog.cacheTable
-    # TODO(SPARK-41623): Support Catalog.uncacheTable
-    @unittest.skip("Fails in Spark Connect, should enable.")
-    def test_table_cache(self):
-        super().test_table_cache()
-
-    # TODO(SPARK-41600): Support Catalog.cacheTable
-    @unittest.skip("Fails in Spark Connect, should enable.")
-    def test_refresh_table(self):
-        super().test_refresh_table()
+    pass
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index 8e008cf9e1e..d3807285f3e 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -22,11 +22,6 @@ from pyspark.testing.connectutils import ReusedConnectTestCase
 
 
 class DataFrameParityTests(DataFrameTestsMixin, ReusedConnectTestCase):
-    # TODO(SPARK-41612): support Catalog.isCached
-    @unittest.skip("Fails in Spark Connect, should enable.")
-    def test_cache(self):
-        super().test_cache()
-
     # TODO(SPARK-41868): Support data type Duration(NANOSECOND)
     @unittest.skip("Fails in Spark Connect, should enable.")
     def test_create_dataframe_from_pandas_with_day_time_interval(self):
diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py
index d0fd39f4e05..dfd2f305bf8 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -1027,7 +1027,7 @@ class DataFrameTestsMixin:
         spark = self.spark
         with self.tempView("tab1", "tab2"):
             spark.createDataFrame([(2, 2), (3, 3)]).createOrReplaceTempView("tab1")
-            spark.createDataFrame([(2, 2), (3, 3)]).createOrReplaceTempView("tab2")
+            spark.createDataFrame([(2, 4), (3, 4)]).createOrReplaceTempView("tab2")
             self.assertFalse(spark.catalog.isCached("tab1"))
             self.assertFalse(spark.catalog.isCached("tab2"))
             spark.catalog.cacheTable("tab1")
@@ -1041,17 +1041,17 @@ class DataFrameTestsMixin:
             self.assertFalse(spark.catalog.isCached("tab1"))
             self.assertFalse(spark.catalog.isCached("tab2"))
             self.assertRaisesRegex(
-                AnalysisException,
+                Exception,
                 "does_not_exist",
                 lambda: spark.catalog.isCached("does_not_exist"),
             )
             self.assertRaisesRegex(
-                AnalysisException,
+                Exception,
                 "does_not_exist",
                 lambda: spark.catalog.cacheTable("does_not_exist"),
             )
             self.assertRaisesRegex(
-                AnalysisException,
+                Exception,
                 "does_not_exist",
                 lambda: spark.catalog.uncacheTable("does_not_exist"),
             )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org