You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/24 00:34:18 UTC
[spark] branch master updated: [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b2d249b1aa2 [SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace
b2d249b1aa2 is described below
commit b2d249b1aa2a6e9ef573a542c8585a736b2873c6
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Fri Jun 24 09:34:02 2022 +0900
[SPARK-39555][PYTHON] Make createTable and listTables in the python side support 3-layer-namespace
### What changes were proposed in this pull request?
Corresponding changes in the python side of [SPARK-39236](https://issues.apache.org/jira/browse/SPARK-39236) (Make CreateTable API and ListTables API compatible )
### Why are the changes needed?
to support 3-layer-namespace in the python side
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
1, for existing UT, update them to cover original cases;
2, for 3-layer-namespace, maually test:
- 2.1 move all `InMemoryCatalog` related files from `test` to `main`, by `mv sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemory*.scala sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/`
- 2.2 fix a few conflicts
- 2.3 compile and then maually test in the pyspark repl, for example:
```python
spark.sql("CREATE DATABASE my_db")
spark.createDataFrame([(1, 1)]).createOrReplaceTempView("temp_tab")
spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING parquet")
spark.sql("CREATE TABLE my_db.tab2 (name STRING, age INT) USING parquet")
schema = StructType([StructField("a", IntegerType(), True)])
description = "this is a test table"
spark.conf.set("spark.sql.catalog.testcat", "org.apache.spark.sql.connector.catalog.InMemoryCatalog")
spark.catalog.createTable("testcat.my_db.my_table", source="json", schema=schema, description=description)
In [2]: spark.catalog.listTables()
Out[2]:
[Table(name='tab1', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
Table(name='temp_tab', catalog='spark_catalog', namespace=None, description=None, tableType='TEMPORARY', isTemporary=True)]
In [3]: spark.catalog.listTables("my_db")
Out[3]:
[Table(name='tab2', catalog='spark_catalog', namespace=['my_db'], description=None, tableType='MANAGED', isTemporary=False),
Table(name='temp_tab', catalog='spark_catalog', namespace=None, description=None, tableType='TEMPORARY', isTemporary=True)]
In [4]: spark.catalog.listTables("testcat.my_db")
Out[4]: [Table(name='my_table', catalog='testcat', namespace=['my_db'], description='this is a test table', tableType='MANAGED', isTemporary=False)]
```
Closes #36957 from zhengruifeng/py_table_create_list.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/catalog.py | 26 ++++++-
python/pyspark/sql/tests/test_catalog.py | 128 ++++++++++++++++++++-----------
2 files changed, 107 insertions(+), 47 deletions(-)
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index b954995f857..cd16be7ba19 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -37,11 +37,19 @@ class Database(NamedTuple):
class Table(NamedTuple):
name: str
- database: Optional[str]
+ catalog: Optional[str]
+ namespace: Optional[List[str]]
description: Optional[str]
tableType: str
isTemporary: bool
+ @property
+ def database(self) -> Optional[str]:
+ if self.namespace is not None and len(self.namespace) == 1:
+ return self.namespace[0]
+ else:
+ return None
+
class Column(NamedTuple):
name: str
@@ -127,6 +135,9 @@ class Catalog:
If no database is specified, the current database is used.
This includes all temporary views.
+
+ .. versionchanged:: 3.4
+ Allowed ``dbName`` to be qualified with catalog name.
"""
if dbName is None:
dbName = self.currentDatabase()
@@ -134,10 +145,18 @@ class Catalog:
tables = []
while iter.hasNext():
jtable = iter.next()
+
+ jnamespace = jtable.namespace()
+ if jnamespace is not None:
+ namespace = [jnamespace[i] for i in range(0, len(jnamespace))]
+ else:
+ namespace = None
+
tables.append(
Table(
name=jtable.name(),
- database=jtable.database(),
+ catalog=jtable.catalog(),
+ namespace=namespace,
description=jtable.description(),
tableType=jtable.tableType(),
isTemporary=jtable.isTemporary(),
@@ -341,6 +360,9 @@ class Catalog:
.. versionchanged:: 3.1
Added the ``description`` parameter.
+
+ .. versionchanged:: 3.4
+ Allowed ``tableName`` to be qualified with catalog name.
"""
if path is not None:
options["path"] = path
diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py
index 2254261263f..b2bf83d7a0d 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -79,55 +79,93 @@ class CatalogTests(ReusedSQLTestCase):
self.assertEqual(tables, tablesDefault)
self.assertEqual(len(tables), 3)
self.assertEqual(len(tablesSomeDb), 2)
- self.assertEqual(
- tables[0],
- Table(
- name="tab1",
- database="default",
- description=None,
- tableType="MANAGED",
- isTemporary=False,
- ),
- )
- self.assertEqual(
- tables[1],
- Table(
- name="tab3_via_catalog",
- database="default",
+
+ # make table in old fashion
+ def makeTable(
+ name,
+ database,
+ description,
+ tableType,
+ isTemporary,
+ ):
+ return Table(
+ name=name,
+ catalog=None,
+ namespace=[database] if database is not None else None,
description=description,
- tableType="MANAGED",
- isTemporary=False,
- ),
+ tableType=tableType,
+ isTemporary=isTemporary,
+ )
+
+ # compare tables in old fashion
+ def compareTables(t1, t2):
+ return (
+ t1.name == t2.name
+ and t1.database == t2.database
+ and t1.description == t2.description
+ and t1.tableType == t2.tableType
+ and t1.isTemporary == t2.isTemporary
+ )
+
+ self.assertTrue(
+ compareTables(
+ tables[0],
+ makeTable(
+ name="tab1",
+ database="default",
+ description=None,
+ tableType="MANAGED",
+ isTemporary=False,
+ ),
+ )
+ )
+ self.assertTrue(
+ compareTables(
+ tables[1],
+ makeTable(
+ name="tab3_via_catalog",
+ database="default",
+ description=description,
+ tableType="MANAGED",
+ isTemporary=False,
+ ),
+ )
)
- self.assertEqual(
- tables[2],
- Table(
- name="temp_tab",
- database=None,
- description=None,
- tableType="TEMPORARY",
- isTemporary=True,
- ),
+ self.assertTrue(
+ compareTables(
+ tables[2],
+ makeTable(
+ name="temp_tab",
+ database=None,
+ description=None,
+ tableType="TEMPORARY",
+ isTemporary=True,
+ ),
+ )
)
- self.assertEqual(
- tablesSomeDb[0],
- Table(
- name="tab2",
- database="some_db",
- description=None,
- tableType="MANAGED",
- isTemporary=False,
- ),
+ self.assertTrue(
+ compareTables(
+ tablesSomeDb[0],
+ makeTable(
+ name="tab2",
+ database="some_db",
+ description=None,
+ tableType="MANAGED",
+ isTemporary=False,
+ ),
+ )
)
- self.assertEqual(
- tablesSomeDb[1],
- Table(
- name="temp_tab",
- database=None,
- description=None,
- tableType="TEMPORARY",
- isTemporary=True,
- ),
+ self.assertTrue(
+ compareTables(
+ tablesSomeDb[1],
+ makeTable(
+ name="temp_tab",
+ database=None,
+ description=None,
+ tableType="TEMPORARY",
+ isTemporary=True,
+ ),
+ )
)
self.assertRaisesRegex(
AnalysisException,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org