You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/24 00:34:18 UTC

[spark] branch master updated: [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b2d249b1aa2 [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace
b2d249b1aa2 is described below

commit b2d249b1aa2a6e9ef573a542c8585a736b2873c6
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Fri Jun 24 09:34:02 2022 +0900

    [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace
    
    ### What changes were proposed in this pull request?
    
    Corresponding changes in the python side of [SPARK-39236](https://issues.apache.org/jira/browse/SPARK-39236) (Make CreateTable API and ListTables API compatible )
    
    ### Why are the changes needed?
    
    to support 3-layer-namespace in the python side
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes
    
    ### How was this patch tested?
    1, for existing UT, update them to cover original cases;
    
    2, for 3-layer-namespace, maually test:
      - 2.1 move all `InMemoryCatalog` related files from `test` to `main`, by `mv sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemory*.scala sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/`
     - 2.2 fix a few conflicts
     - 2.3 compile and then maually test in the pyspark repl, for example:
    
    ```python
    
    spark.sql("CREATE DATABASE my_db")
    spark.createDataFrame([(1, 1)]).createOrReplaceTempView("temp_tab")
    spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
    spark.sql("CREATE TABLE my_db.tab2 (name STRING, age INT) USING parquet")
    
    schema = StructType([StructField("a", IntegerType(), True)])
    description = "this is a test table"
    
    spark.conf.set("spark.sql.catalog.testcat", "org.apache.spark.sql.connector.catalog.InMemoryCatalog")
    
    spark.catalog.createTable("testcat.my_db.my_table", source="json", schema=schema, description=description)
    
    In [2]: spark.catalog.listTables()
    Out[2]:
    [Table(name='tab1', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
     Table(name='temp_tab', catalog='spark_catalog', namespace=None, description=None, tableType='TEMPORARY', isTemporary=True)]
    
    In [3]: spark.catalog.listTables("my_db")
    Out[3]:
    [Table(name='tab2', catalog='spark_catalog', namespace=['my_db'], description=None, tableType='MANAGED', isTemporary=False),
     Table(name='temp_tab', catalog='spark_catalog', namespace=None, description=None, tableType='TEMPORARY', isTemporary=True)]
    
    In [4]: spark.catalog.listTables("testcat.my_db")
    Out[4]: [Table(name='my_table', catalog='testcat', namespace=['my_db'], description='this is a test table', tableType='MANAGED', isTemporary=False)]
    ```
    
    Closes #36957 from zhengruifeng/py_table_create_list.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/catalog.py            |  26 ++++++-
 python/pyspark/sql/tests/test_catalog.py | 128 ++++++++++++++++++++-----------
 2 files changed, 107 insertions(+), 47 deletions(-)

diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index b954995f857..cd16be7ba19 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -37,11 +37,19 @@ class Database(NamedTuple):
 
 class Table(NamedTuple):
     name: str
-    database: Optional[str]
+    catalog: Optional[str]
+    namespace: Optional[List[str]]
     description: Optional[str]
     tableType: str
     isTemporary: bool
 
+    @property
+    def database(self) -> Optional[str]:
+        if self.namespace is not None and len(self.namespace) == 1:
+            return self.namespace[0]
+        else:
+            return None
+
 
 class Column(NamedTuple):
     name: str
@@ -127,6 +135,9 @@ class Catalog:
 
         If no database is specified, the current database is used.
         This includes all temporary views.
+
+        .. versionchanged:: 3.4
+           Allowed ``dbName`` to be qualified with catalog name.
         """
         if dbName is None:
             dbName = self.currentDatabase()
@@ -134,10 +145,18 @@ class Catalog:
         tables = []
         while iter.hasNext():
             jtable = iter.next()
+
+            jnamespace = jtable.namespace()
+            if jnamespace is not None:
+                namespace = [jnamespace[i] for i in range(0, len(jnamespace))]
+            else:
+                namespace = None
+
             tables.append(
                 Table(
                     name=jtable.name(),
-                    database=jtable.database(),
+                    catalog=jtable.catalog(),
+                    namespace=namespace,
                     description=jtable.description(),
                     tableType=jtable.tableType(),
                     isTemporary=jtable.isTemporary(),
@@ -341,6 +360,9 @@ class Catalog:
 
         .. versionchanged:: 3.1
            Added the ``description`` parameter.
+
+        .. versionchanged:: 3.4
+           Allowed ``tableName`` to be qualified with catalog name.
         """
         if path is not None:
             options["path"] = path
diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py
index 2254261263f..b2bf83d7a0d 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -79,55 +79,93 @@ class CatalogTests(ReusedSQLTestCase):
                     self.assertEqual(tables, tablesDefault)
                     self.assertEqual(len(tables), 3)
                     self.assertEqual(len(tablesSomeDb), 2)
-                    self.assertEqual(
-                        tables[0],
-                        Table(
-                            name="tab1",
-                            database="default",
-                            description=None,
-                            tableType="MANAGED",
-                            isTemporary=False,
-                        ),
-                    )
-                    self.assertEqual(
-                        tables[1],
-                        Table(
-                            name="tab3_via_catalog",
-                            database="default",
+
+                    # make table in old fashion
+                    def makeTable(
+                        name,
+                        database,
+                        description,
+                        tableType,
+                        isTemporary,
+                    ):
+                        return Table(
+                            name=name,
+                            catalog=None,
+                            namespace=[database] if database is not None else None,
                             description=description,
-                            tableType="MANAGED",
-                            isTemporary=False,
-                        ),
+                            tableType=tableType,
+                            isTemporary=isTemporary,
+                        )
+
+                    # compare tables in old fashion
+                    def compareTables(t1, t2):
+                        return (
+                            t1.name == t2.name
+                            and t1.database == t2.database
+                            and t1.description == t2.description
+                            and t1.tableType == t2.tableType
+                            and t1.isTemporary == t2.isTemporary
+                        )
+
+                    self.assertTrue(
+                        compareTables(
+                            tables[0],
+                            makeTable(
+                                name="tab1",
+                                database="default",
+                                description=None,
+                                tableType="MANAGED",
+                                isTemporary=False,
+                            ),
+                        )
+                    )
+                    self.assertTrue(
+                        compareTables(
+                            tables[1],
+                            makeTable(
+                                name="tab3_via_catalog",
+                                database="default",
+                                description=description,
+                                tableType="MANAGED",
+                                isTemporary=False,
+                            ),
+                        )
                     )
-                    self.assertEqual(
-                        tables[2],
-                        Table(
-                            name="temp_tab",
-                            database=None,
-                            description=None,
-                            tableType="TEMPORARY",
-                            isTemporary=True,
-                        ),
+                    self.assertTrue(
+                        compareTables(
+                            tables[2],
+                            makeTable(
+                                name="temp_tab",
+                                database=None,
+                                description=None,
+                                tableType="TEMPORARY",
+                                isTemporary=True,
+                            ),
+                        )
                     )
-                    self.assertEqual(
-                        tablesSomeDb[0],
-                        Table(
-                            name="tab2",
-                            database="some_db",
-                            description=None,
-                            tableType="MANAGED",
-                            isTemporary=False,
-                        ),
+                    self.assertTrue(
+                        compareTables(
+                            tablesSomeDb[0],
+                            makeTable(
+                                name="tab2",
+                                database="some_db",
+                                description=None,
+                                tableType="MANAGED",
+                                isTemporary=False,
+                            ),
+                        )
                     )
-                    self.assertEqual(
-                        tablesSomeDb[1],
-                        Table(
-                            name="temp_tab",
-                            database=None,
-                            description=None,
-                            tableType="TEMPORARY",
-                            isTemporary=True,
-                        ),
+                    self.assertTrue(
+                        compareTables(
+                            tablesSomeDb[1],
+                            makeTable(
+                                name="temp_tab",
+                                database=None,
+                                description=None,
+                                tableType="TEMPORARY",
+                                isTemporary=True,
+                            ),
+                        )
                     )
                     self.assertRaisesRegex(
                         AnalysisException,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org