You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "cccs-eric (via GitHub)" <gi...@apache.org> on 2023/06/27 13:17:06 UTC

[GitHub] [iceberg] cccs-eric commented on a diff in pull request #7921: Initial code for pyiceberg JDBC Catalog

cccs-eric commented on code in PR #7921:
URL: https://github.com/apache/iceberg/pull/7921#discussion_r1243721594


##########
python/pyiceberg/catalog/jdbc.py:
##########
@@ -0,0 +1,475 @@
+from pyiceberg.catalog import Catalog
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Set,
+    Union,
+)
+from pyiceberg.catalog import (
+    ICEBERG,
+    METADATA_LOCATION,
+    PREVIOUS_METADATA_LOCATION,
+    TABLE_TYPE,
+    Catalog,
+    Identifier,
+    Properties,
+    PropertiesUpdateSummary,
+)
+from pyiceberg.exceptions import (
+    ConditionalCheckFailedException,
+    GenericDynamoDbError,
+    NamespaceAlreadyExistsError,
+    NamespaceNotEmptyError,
+    NoSuchIcebergTableError,
+    NoSuchNamespaceError,
+    NoSuchPropertyException,
+    NoSuchTableError,
+    TableAlreadyExistsError,
+)
+from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.typedef import EMPTY_DICT
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Set,
+    Union,
+)
+import psycopg2 as db
+from psycopg2 import Error
+from psycopg2.extras import DictCursor
+import sqlite3
+from urllib.parse import urlparse
+
+JDBC_URI = "uri"
+
+# Catalog tables
+CATALOG_TABLE_NAME = "iceberg_tables"
+CATALOG_NAME = "catalog_name"
+TABLE_NAMESPACE = "table_namespace"
+TABLE_NAME = "table_name"
+METADATA_LOCATION = "metadata_location"
+PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
+
+# Catalog SQL statements
+CREATE_CATALOG_TABLE = f"CREATE TABLE {CATALOG_TABLE_NAME} ({CATALOG_NAME} VARCHAR(255) NOT NULL, {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, {TABLE_NAME} VARCHAR(255) NOT NULL, {METADATA_LOCATION} VARCHAR(1000), {PREVIOUS_METADATA_LOCATION} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME}, {TABLE_NAMESPACE}, {TABLE_NAME}))"
+LIST_TABLES_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s"
+GET_TABLE_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s"
+DROP_TABLE_SQL = f"DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s "
+DO_COMMIT_CREATE_TABLE_SQL = f"INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_NAME}, {TABLE_NAMESPACE} , {TABLE_NAME} , {METADATA_LOCATION}, {PREVIOUS_METADATA_LOCATION}) VALUES (%s,%s,%s,%s,null)"
+RENAME_TABLE_SQL = f"UPDATE {CATALOG_TABLE_NAME} SET {TABLE_NAMESPACE} = %s, {TABLE_NAME} = %s WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s "
+
+GET_NAMESPACE_SQL = f"SELECT {TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} LIKE %s LIMIT 1"
+LIST_ALL_TABLE_NAMESPACES_SQL = f"SELECT DISTINCT {TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+
+# Catalog Namespace Properties
+NAMESPACE_EXISTS_PROPERTY = "exists"
+NAMESPACE_MINIMAL_PROPERTIES = {NAMESPACE_EXISTS_PROPERTY: "true"}
+NAMESPACE_PROPERTIES_TABLE_NAME = "iceberg_namespace_properties"
+NAMESPACE_NAME = "namespace"
+NAMESPACE_PROPERTY_KEY = "property_key"
+NAMESPACE_PROPERTY_VALUE = "property_value"
+
+# Catalog Namespace SQL statements
+CREATE_NAMESPACE_PROPERTIES_TABLE = f"CREATE TABLE {NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME} VARCHAR(255) NOT NULL, {NAMESPACE_NAME} VARCHAR(255) NOT NULL, {NAMESPACE_PROPERTY_KEY} VARCHAR(255), {NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME}, {NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))"
+INSERT_NAMESPACE_PROPERTIES_SQL = f"INSERT INTO {NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME}, {NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}, {NAMESPACE_PROPERTY_VALUE}) VALUES "
+INSERT_PROPERTIES_VALUES_BASE = f"(%s,%s,%s,%s)"
+LIST_ALL_PROPERTY_NAMESPACES_SQL = f"SELECT DISTINCT {NAMESPACE_NAME} FROM {NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+DELETE_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM {NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+DELETE_ALL_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM {NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {NAMESPACE_NAME} = %s"
+UPDATE_NAMESPACE_PROPERTIES_START_SQL = f"UPDATE {NAMESPACE_PROPERTIES_TABLE_NAME} SET {NAMESPACE_PROPERTY_VALUE} = CASE"
+UPDATE_NAMESPACE_PROPERTIES_END_SQL = f" END WHERE {CATALOG_NAME} = %s AND {NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+
+
+GET_NAMESPACE_PROPERTIES_SQL = f"SELECT {NAMESPACE_NAME} FROM {NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {NAMESPACE_NAME} LIKE %s LIMIT 1"
+GET_ALL_NAMESPACE_PROPERTIES_SQL = f"SELECT * FROM {NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND {NAMESPACE_NAME} = %s"
+
+# Custom SQL not from JDBCCatalog.java
+LIST_ALL_NAMESPACES_SQL = f"""
+SELECT DISTINCT ns FROM
+(
+    SELECT {TABLE_NAMESPACE} AS ns FROM {CATALOG_TABLE_NAME} 
+    WHERE {CATALOG_NAME} = %s 
+    UNION
+    SELECT {NAMESPACE_NAME} AS ns FROM {NAMESPACE_PROPERTIES_TABLE_NAME} 
+    WHERE {CATALOG_NAME} = %s
+) AS all_catalog_namespaces
+"""
+
+def _sqlite(**properties: str) -> Any:
+    parsed_uri = urlparse(properties.get("uri"))
+    return sqlite3.connect(database=parsed_uri.path, uri=False)
+
+def _postgresql(**properties: str) -> Any:
+    parsed_uri = urlparse(properties.get("uri"))
+    postgresql_props = {
+        "user": parsed_uri.username,
+        "password": parsed_uri.password,
+        "dbname": parsed_uri.path[1:],
+        "host": parsed_uri.hostname,
+        "port": parsed_uri.port,
+    }
+
+    return db.connect(**postgresql_props)
+
+SCHEME_TO_DB = {
+    "file": _sqlite,
+    "postgresql": _postgresql,
+}
+
+class JDBCCatalog(Catalog):
+    def __init__(self, name: str, **properties: str):
+        super().__init__(name, **properties)
+
+        # Get a database connection for a specific scheme.
+        uri = urlparse(self.properties.get("uri"))
+        uri_scheme = str(uri.scheme)
+        if uri_scheme not in SCHEME_TO_DB:
+            raise ValueError(f"No registered database for scheme: {uri_scheme}")
+        self._get_db_connection = SCHEME_TO_DB[uri_scheme]
+
+    def initialize_catalog_tables(self) -> None:
+        with self._get_db_connection(**self.properties) as conn:
+            try:
+                curs = conn.cursor()

Review Comment:
   I am totally with you.  That's what I had originally and then I tried to make the code portable between sqlite and postgres.  I ran into issues with using context manager with cursors in sqlite, so I decided to close it manually.  Turns out that having something portable is more of a PITA than I thought since they don't share a common way to express parameters in a statement (Postgres uses `%s` and sqlite `?`).  I'm not sure if we want to go down that route, but if not, I am all for using a context manager.  I'll proceed with that change and if we ever want to support sqlite, we can revisit this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org