You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2020/07/14 03:37:52 UTC
[phoenix-queryserver] branch master updated: PHOENIX-5994
SqlAlchemy schema filtering incorrect semantics
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-queryserver.git
The following commit(s) were added to refs/heads/master by this push:
new 104fc64 PHOENIX-5994 SqlAlchemy schema filtering incorrect semantics
104fc64 is described below
commit 104fc646357912613a45942209c877d3b1d80980
Author: Istvan Toth <st...@apache.org>
AuthorDate: Tue Jul 7 13:54:24 2020 +0200
PHOENIX-5994 SqlAlchemy schema filtering incorrect semantics
Also rewrite metadata and connection property handling
---
python-phoenixdb/NEWS.rst | 3 +
python-phoenixdb/README.rst | 8 +-
python-phoenixdb/phoenixdb/avatica/client.py | 80 +++++++++-----
.../phoenixdb/avatica/proto/__init__.py | 1 -
python-phoenixdb/phoenixdb/connection.py | 100 ++++++++++-------
python-phoenixdb/phoenixdb/cursor.py | 14 +--
python-phoenixdb/phoenixdb/meta.py | 96 +++++++++++++++++
python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py | 104 ++++++++----------
python-phoenixdb/phoenixdb/tests/test_db.py | 119 +++++++++++++++++++++
.../phoenixdb/tests/test_sqlalchemy.py | 41 ++++++-
10 files changed, 429 insertions(+), 137 deletions(-)
diff --git a/python-phoenixdb/NEWS.rst b/python-phoenixdb/NEWS.rst
index db0ab0f..819af84 100644
--- a/python-phoenixdb/NEWS.rst
+++ b/python-phoenixdb/NEWS.rst
@@ -16,6 +16,9 @@ Unreleased
- Removed shell example, as it was python2 only
- Updated documentation
- Added SQLAlchemy dialect
+- Implemented Avatica Metadata API
+- Misc fixes
+- Licensing cleanup
Version 0.7
-----------
diff --git a/python-phoenixdb/README.rst b/python-phoenixdb/README.rst
index 922d562..a7a32f6 100644
--- a/python-phoenixdb/README.rst
+++ b/python-phoenixdb/README.rst
@@ -60,18 +60,18 @@ necessary requirements::
You can start a Phoenix QueryServer instance on http://localhost:8765 for testing by running
the following command in the phoenix-queryserver directory::
- mvn clean verify -am -pl queryserver-it -Dtest=foo \
+ mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
-Dit.test=QueryServerBasicsIT\#startLocalPQS \
-Ddo.not.randomize.pqs.port=true -Dstart.unsecure.pqs=true
You can start a secure (https+kerberos) Phoenix QueryServer instance on https://localhost:8765
for testing by running the following command in the phoenix-queryserver directory::
- mvn clean verify -am -pl queryserver-it -Dtest=foo \
+ mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
-Dit.test=SecureQueryServerPhoenixDBIT\#startLocalPQS \
-Ddo.not.randomize.pqs.port=true -Dstart.secure.pqs=true
-this will also create a shell script in queryserver-it/target/krb_setup.sh, that you can use to set
+this will also create a shell script in phoenix-queryserver-it/target/krb_setup.sh, that you can use to set
up the environment for the tests.
If you want to use the library without installing the phoenixdb library, you can use
@@ -119,7 +119,7 @@ environments locally::
You can also run the test suite from maven as part of the Java build by setting the
run.full.python.testsuite property. You DO NOT need to set the PHOENIXDB_* enviroment variables,
maven will set them up for you. The output of the test run will be saved in
-phoenix-queryserver/queryserver-it/target/python-stdout.log and python-stderr.log::
+phoenix-queryserver/phoenix-queryserver-it/target/python-stdout.log and python-stderr.log::
mvn clean verify -Drun.full.python.testsuite=true
diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py
index f899c94..16885bb 100644
--- a/python-phoenixdb/phoenixdb/avatica/client.py
+++ b/python-phoenixdb/phoenixdb/avatica/client.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Implementation of the JSON-over-HTTP RPC protocol used by Avatica."""
+"""Implementation of the PROTOBUF-over-HTTP RPC protocol used by Avatica."""
import logging
import math
@@ -88,19 +88,6 @@ SQLSTATE_ERROR_CLASSES = [
('INT', errors.InternalError), # Phoenix internal error
]
-# Relevant properties as defined by https://calcite.apache.org/avatica/docs/client_reference.html
-OPEN_CONNECTION_PROPERTIES = (
- 'avatica_user', # User for the database connection
- 'avatica_password', # Password for the user
- 'auth',
- 'authentication',
- 'truststore',
- 'verify',
- 'do_as',
- 'user',
- 'password'
-)
-
def raise_sql_error(code, sqlstate, message):
for prefix, error_class in SQLSTATE_ERROR_CLASSES:
@@ -239,7 +226,10 @@ class AvaticaClient(object):
def get_catalogs(self, connection_id):
request = requests_pb2.CatalogsRequest()
request.connection_id = connection_id
- return self._apply(request)
+ response_data = self._apply(request, 'ResultSetResponse')
+ response = responses_pb2.ResultSetResponse()
+ response.ParseFromString(response_data)
+ return response
def get_schemas(self, connection_id, catalog=None, schemaPattern=None):
request = requests_pb2.SchemasRequest()
@@ -248,7 +238,10 @@ class AvaticaClient(object):
request.catalog = catalog
if schemaPattern is not None:
request.schema_pattern = schemaPattern
- return self._apply(request)
+ response_data = self._apply(request, 'ResultSetResponse')
+ response = responses_pb2.ResultSetResponse()
+ response.ParseFromString(response_data)
+ return response
def get_tables(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
request = requests_pb2.TablesRequest()
@@ -260,11 +253,12 @@ class AvaticaClient(object):
if tableNamePattern is not None:
request.table_name_pattern = tableNamePattern
if typeList is not None:
- request.type_list = typeList
- if typeList is not None:
request.type_list.extend(typeList)
request.has_type_list = typeList is not None
- return self._apply(request)
+ response_data = self._apply(request, 'ResultSetResponse')
+ response = responses_pb2.ResultSetResponse()
+ response.ParseFromString(response_data)
+ return response
def get_columns(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
request = requests_pb2.ColumnsRequest()
@@ -277,17 +271,35 @@ class AvaticaClient(object):
request.table_name_pattern = tableNamePattern
if columnNamePattern is not None:
request.column_name_pattern = columnNamePattern
- return self._apply(request)
+ response_data = self._apply(request, 'ResultSetResponse')
+ response = responses_pb2.ResultSetResponse()
+ response.ParseFromString(response_data)
+ return response
def get_table_types(self, connection_id):
request = requests_pb2.TableTypesRequest()
request.connection_id = connection_id
- return self._apply(request)
+ response_data = self._apply(request, 'ResultSetResponse')
+ response = responses_pb2.ResultSetResponse()
+ response.ParseFromString(response_data)
+ return response
def get_type_info(self, connection_id):
request = requests_pb2.TypeInfoRequest()
request.connection_id = connection_id
- return self._apply(request)
+ response_data = self._apply(request, 'ResultSetResponse')
+ response = responses_pb2.ResultSetResponse()
+ response.ParseFromString(response_data)
+ return response
+
+ def connection_sync_dict(self, connection_id, connProps=None):
+ conn_props = self.connection_sync(connection_id, connProps)
+ return {
+ 'autoCommit': conn_props.auto_commit,
+ 'readOnly': conn_props.read_only,
+ 'transactionIsolation': conn_props.transaction_isolation,
+ 'catalog': conn_props.catalog,
+ 'schema': conn_props.schema}
def connection_sync(self, connection_id, connProps=None):
"""Synchronizes connection properties with the server.
@@ -301,18 +313,28 @@ class AvaticaClient(object):
:returns:
A ``common_pb2.ConnectionProperties`` object.
"""
- if connProps is None:
- connProps = {}
+ if connProps:
+ props = connProps.copy()
+ else:
+ props = {}
request = requests_pb2.ConnectionSyncRequest()
request.connection_id = connection_id
- request.conn_props.auto_commit = connProps.get('autoCommit', False)
request.conn_props.has_auto_commit = True
- request.conn_props.read_only = connProps.get('readOnly', False)
request.conn_props.has_read_only = True
- request.conn_props.transaction_isolation = connProps.get('transactionIsolation', 0)
- request.conn_props.catalog = connProps.get('catalog', '')
- request.conn_props.schema = connProps.get('schema', '')
+ if 'autoCommit' in props:
+ request.conn_props.auto_commit = props.pop('autoCommit')
+ if 'readOnly' in props:
+ request.conn_props.read_only = props.pop('readOnly')
+ if 'transactionIsolation' in props:
+ request.conn_props.transaction_isolation = props.pop('transactionIsolation', None)
+ if 'catalog' in props:
+ request.conn_props.catalog = props.pop('catalog', None)
+ if 'schema' in props:
+ request.conn_props.schema = props.pop('schema', None)
+
+ if props:
+ logger.warning("Unhandled connection property:" + props)
response_data = self._apply(request)
response = responses_pb2.ConnectionSyncResponse()
diff --git a/python-phoenixdb/phoenixdb/avatica/proto/__init__.py b/python-phoenixdb/phoenixdb/avatica/proto/__init__.py
index 09697dc..ae1e83e 100644
--- a/python-phoenixdb/phoenixdb/avatica/proto/__init__.py
+++ b/python-phoenixdb/phoenixdb/avatica/proto/__init__.py
@@ -12,4 +12,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
diff --git a/python-phoenixdb/phoenixdb/connection.py b/python-phoenixdb/phoenixdb/connection.py
index 0930115..f752230 100644
--- a/python-phoenixdb/phoenixdb/connection.py
+++ b/python-phoenixdb/phoenixdb/connection.py
@@ -18,14 +18,17 @@ import uuid
import weakref
from phoenixdb import errors
-from phoenixdb.avatica.client import OPEN_CONNECTION_PROPERTIES
from phoenixdb.cursor import Cursor
from phoenixdb.errors import ProgrammingError
+from phoenixdb.meta import Meta
__all__ = ['Connection']
logger = logging.getLogger(__name__)
+AVATICA_PROPERTIES = ('autoCommit', 'autocommit', 'readOnly', 'readonly', 'transactionIsolation',
+ 'catalog', 'schema')
+
class Connection(object):
"""Database connection.
@@ -46,17 +49,11 @@ class Connection(object):
else:
self.cursor_factory = Cursor
self._cursors = []
- # Extract properties to pass to OpenConnectionRequest
- self._connection_args = {}
- # The rest of the kwargs
- self._filtered_args = {}
- for k in kwargs:
- if k in OPEN_CONNECTION_PROPERTIES:
- self._connection_args[k] = kwargs[k]
- else:
- self._filtered_args[k] = kwargs[k]
+ self._phoenix_props, avatica_props_init = Connection._map_conn_props(kwargs)
self.open()
- self.set_session(**self._filtered_args)
+
+ # TODO we could probably optimize it away if the defaults are not changed
+ self.set_session(**avatica_props_init)
def __del__(self):
if not self._closed:
@@ -69,10 +66,36 @@ class Connection(object):
if not self._closed:
self.close()
+ @staticmethod
+ def _default_avatica_props():
+ return {'autoCommit': False,
+ 'readOnly': False,
+ 'transactionIsolation': 0,
+ 'catalog': '',
+ 'schema': ''}
+
+ @staticmethod
+ def _map_conn_props(conn_props):
+ """Sorts and prepocesses args that should be passed to Phoenix and Avatica"""
+
+ avatica_props = dict([(k, conn_props[k]) for k in conn_props.keys() if k in AVATICA_PROPERTIES])
+ phoenix_props = dict([(k, conn_props[k]) for k in conn_props.keys() if k not in AVATICA_PROPERTIES])
+ avatica_props = Connection._map_legacy_avatica_props(avatica_props)
+
+ return (phoenix_props, avatica_props)
+
+ @staticmethod
+ def _map_legacy_avatica_props(props):
+ if 'autocommit' in props:
+ props['autoCommit'] = bool(props.pop('autocommit'))
+ if 'readonly' in props:
+ props['readOnly'] = bool(props.pop('readonly'))
+ return props
+
def open(self):
"""Opens the connection."""
self._id = str(uuid.uuid4())
- self._client.open_connection(self._id, info=self._connection_args)
+ self._client.open_connection(self._id, info=self._phoenix_props)
def close(self):
"""Closes the connection.
@@ -83,7 +106,7 @@ class Connection(object):
be automatically called at the end of the ``with`` block.
"""
if self._closed:
- raise ProgrammingError('the connection is already closed')
+ raise ProgrammingError('The connection is already closed.')
for cursor_ref in self._cursors:
cursor = cursor_ref()
if cursor is not None and not cursor._closed:
@@ -99,12 +122,12 @@ class Connection(object):
def commit(self):
if self._closed:
- raise ProgrammingError('the connection is already closed')
+ raise ProgrammingError('The connection is already closed.')
self._client.commit(self._id)
def rollback(self):
if self._closed:
- raise ProgrammingError('the connection is already closed')
+ raise ProgrammingError('The connection is already closed.')
self._client.rollback(self._id)
def cursor(self, cursor_factory=None):
@@ -121,12 +144,12 @@ class Connection(object):
A :class:`~phoenixdb.cursor.Cursor` object.
"""
if self._closed:
- raise ProgrammingError('the connection is already closed')
+ raise ProgrammingError('The connection is already closed.')
cursor = (cursor_factory or self.cursor_factory)(self)
self._cursors.append(weakref.ref(cursor, self._cursors.remove))
return cursor
- def set_session(self, autocommit=None, readonly=None):
+ def set_session(self, **props):
"""Sets one or more parameters in the current connection.
:param autocommit:
@@ -135,50 +158,51 @@ class Connection(object):
:param readonly:
Switch the connection to read-only mode.
"""
- props = {}
- if autocommit is not None:
- props['autoCommit'] = bool(autocommit)
- if readonly is not None:
- props['readOnly'] = bool(readonly)
- props = self._client.connection_sync(self._id, props)
- self._autocommit = props.auto_commit
- self._readonly = props.read_only
- self._transactionisolation = props.transaction_isolation
+ props = Connection._map_legacy_avatica_props(props)
+ self._avatica_props = self._client.connection_sync_dict(self._id, props)
@property
def autocommit(self):
"""Read/write attribute for switching the connection's autocommit mode."""
- return self._autocommit
+ return self._avatica_props['autoCommit']
@autocommit.setter
def autocommit(self, value):
if self._closed:
- raise ProgrammingError('the connection is already closed')
- props = self._client.connection_sync(self._id, {'autoCommit': bool(value)})
- self._autocommit = props.auto_commit
+ raise ProgrammingError('The connection is already closed.')
+ self._avatica_props = self._client.connection_sync_dict(self._id, {'autoCommit': bool(value)})
@property
def readonly(self):
"""Read/write attribute for switching the connection's readonly mode."""
- return self._readonly
+ return self._avatica_props['readOnly']
@readonly.setter
def readonly(self, value):
if self._closed:
- raise ProgrammingError('the connection is already closed')
- props = self._client.connection_sync(self._id, {'readOnly': bool(value)})
- self._readonly = props.read_only
+ raise ProgrammingError('The connection is already closed.')
+ self._avatica_props = self._client.connection_sync_dict(self._id, {'readOnly': bool(value)})
@property
def transactionisolation(self):
- return self._transactionisolation
+ return self._avatica_props['_transactionIsolation']
@transactionisolation.setter
def transactionisolation(self, value):
if self._closed:
- raise ProgrammingError('the connection is already closed')
- props = self._client.connection_sync(self._id, {'transactionIsolation': bool(value)})
- self._transactionisolation = props.transaction_isolation
+ raise ProgrammingError('The connection is already closed.')
+ self._avatica_props = self._client.connection_sync_dict(self._id, {'transactionIsolation': bool(value)})
+
+ def meta(self):
+ """Creates a new meta.
+
+ :returns:
+ A :class:`~phoenixdb.meta` object.
+ """
+ if self._closed:
+ raise ProgrammingError('The connection is already closed.')
+ meta = Meta(self)
+ return meta
for name in errors.__all__:
diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py
index 1dac835..5521ebc 100644
--- a/python-phoenixdb/phoenixdb/cursor.py
+++ b/python-phoenixdb/phoenixdb/cursor.py
@@ -162,14 +162,16 @@ class Cursor(object):
offset=offset, frame_max_size=self.itersize)
self._set_frame(frame)
+ def _process_result(self, result):
+ if result.own_statement:
+ self._set_id(result.statement_id)
+ self._set_signature(result.signature if result.HasField('signature') else None)
+ self._set_frame(result.first_frame if result.HasField('first_frame') else None)
+ self._updatecount = result.update_count
+
def _process_results(self, results):
if results:
- result = results[0]
- if result.own_statement:
- self._set_id(result.statement_id)
- self._set_signature(result.signature if result.HasField('signature') else None)
- self._set_frame(result.first_frame if result.HasField('first_frame') else None)
- self._updatecount = result.update_count
+ return self._process_result(results[0])
def _transform_parameters(self, parameters):
typed_parameters = []
diff --git a/python-phoenixdb/phoenixdb/meta.py b/python-phoenixdb/phoenixdb/meta.py
new file mode 100644
index 0000000..18ad147
--- /dev/null
+++ b/python-phoenixdb/phoenixdb/meta.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import logging
+
+from phoenixdb.errors import ProgrammingError
+from phoenixdb.cursor import DictCursor
+
+
+__all__ = ['Meta']
+
+logger = logging.getLogger(__name__)
+
+
+class Meta(object):
+ """Database meta for querying MetaData
+ """
+
+ def __init__(self, connection):
+ self._connection = connection
+
+ def get_catalogs(self):
+ if self._connection._closed:
+ raise ProgrammingError('The connection is already closed.')
+ result = self._connection._client.get_catalogs(self._connection._id)
+ with DictCursor(self._connection) as cursor:
+ cursor._process_result(result)
+ return cursor.fetchall()
+
+ def get_schemas(self, catalog=None, schemaPattern=None):
+ if self._connection._closed:
+ raise ProgrammingError('The connection is already closed.')
+ result = self._connection._client.get_schemas(self._connection._id, catalog, schemaPattern)
+ with DictCursor(self._connection) as cursor:
+ cursor._process_result(result)
+ return self._fix_default(cursor.fetchall(), schemaPattern=schemaPattern)
+
+ def get_tables(self, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
+ if self._connection._closed:
+ raise ProgrammingError('The connection is already closed.')
+ result = self._connection._client.get_tables(
+ self._connection._id, catalog, schemaPattern, tableNamePattern, typeList=typeList)
+ with DictCursor(self._connection) as cursor:
+ cursor._process_result(result)
+ return self._fix_default(cursor.fetchall(), catalog, schemaPattern)
+
+ def get_columns(self, catalog=None, schemaPattern=None, tableNamePattern=None,
+ columnNamePattern=None):
+ if self._connection._closed:
+ raise ProgrammingError('The connection is already closed.')
+ result = self._connection._client.get_columns(
+ self._connection._id, catalog, schemaPattern, tableNamePattern, columnNamePattern)
+ with DictCursor(self._connection) as cursor:
+ cursor._process_result(result)
+ return self._fix_default(cursor.fetchall(), catalog, schemaPattern)
+
+ def get_table_types(self):
+ if self._connection._closed:
+ raise ProgrammingError('The connection is already closed.')
+ result = self._connection._client.get_table_types(self._connection._id)
+ with DictCursor(self._connection) as cursor:
+ cursor._process_result(result)
+ return cursor.fetchall()
+
+ def get_type_info(self):
+ if self._connection._closed:
+ raise ProgrammingError('The connection is already closed.')
+ result = self._connection._client.get_type_info(self._connection._id)
+ with DictCursor(self._connection) as cursor:
+ cursor._process_result(result)
+ return cursor.fetchall()
+
+ def _fix_default(self, rows, catalog=None, schemaPattern=None):
+ '''Workaround for PHOENIX-6003'''
+ if schemaPattern == '':
+ rows = [row for row in rows if row['TABLE_SCHEM'] is None]
+ if catalog == '':
+ rows = [row for row in rows if row['TABLE_CATALOG'] is None]
+ # Couldn't find a sane way to do it that works on 2 and 3
+ if sys.version_info.major == 3:
+ return [{k: v or '' for k, v in row.items()} for row in rows]
+ else:
+ return [{k: v or '' for k, v in row.iteritems()} for row in rows]
diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
index ebe8f12..ad75d6d 100644
--- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
+++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
@@ -126,76 +126,66 @@ class PhoenixDialect(DefaultDialect):
))
return [phoenix_url], connect_args
- def has_table(self, connection, table_name, schema=None):
+ def has_table(self, connection, table_name, schema=None, **kw):
if schema is None:
- query = "SELECT 1 FROM system.catalog WHERE table_name = ? LIMIT 1"
- params = [table_name.upper()]
- else:
- query = "SELECT 1 FROM system.catalog WHERE table_name = ? AND TABLE_SCHEM = ? LIMIT 1"
- params = [table_name.upper(), schema.upper()]
- return connection.execute(query, params).first() is not None
+ schema = ''
+ return bool(connection.connect().connection.meta().get_tables(
+ tableNamePattern=table_name,
+ schemaPattern=schema,
+ typeList=('TABLE', 'SYSTEM_TABLE')))
def get_schema_names(self, connection, **kw):
- query = "SELECT DISTINCT TABLE_SCHEM FROM SYSTEM.CATALOG"
- return [row[0] for row in connection.execute(query)]
+ schemas = connection.connect().connection.meta().get_schemas()
+ return [schema['TABLE_SCHEM'] for schema in schemas]
- def get_table_names(self, connection, schema=None, **kw):
+ def get_table_names(self, connection, schema=None, order_by=None, **kw):
+ '''order_by is ignored'''
if schema is None:
- query = "SELECT DISTINCT table_name FROM SYSTEM.CATALOG"
- params = []
- else:
- query = "SELECT DISTINCT table_name FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = ? "
- params = [schema.upper()]
- return [row[0] for row in connection.execute(query, params)]
+ schema = ''
+ tables = connection.connect().connection.meta().get_tables(
+ schemaPattern=schema, typeList=('TABLE', 'SYSTEM_TABLE'))
+ return [table['TABLE_NAME'] for table in tables]
+
+ def get_view_names(self, connection, schema=None, **kw):
+ if schema is None:
+ schema = ''
+ return connection.connect().connection.meta().get_tables(schemaPattern=schema,
+ typeList=('VIEW'))
def get_columns(self, connection, table_name, schema=None, **kw):
if schema is None:
- query = """SELECT COLUMN_NAME, DATA_TYPE, NULLABLE
- FROM system.catalog
- WHERE table_name = ?
- AND ORDINAL_POSITION is not null
- ORDER BY ORDINAL_POSITION"""
- params = [table_name.upper()]
- else:
- query = """SELECT COLUMN_NAME, DATA_TYPE, NULLABLE
- FROM system.catalog
- WHERE TABLE_SCHEM = ?
- AND table_name = ?
- AND ORDINAL_POSITION is not null
- ORDER BY ORDINAL_POSITION"""
- params = [schema.upper(), table_name.upper()]
-
- # get all of the fields for this table
- c = connection.execute(query, params)
- cols = []
- while True:
- row = c.fetchone()
- if row is None:
- break
- name = row[0]
- col_type = COLUMN_DATA_TYPE[row[1]]
- nullable = row[2] == 1 if True else False
-
- col_d = {
- 'name': name,
- 'type': col_type,
- 'nullable': nullable,
- 'default': None
- }
-
- cols.append(col_d)
- return cols
-
- # TODO This should be possible to implement
- def get_pk_constraint(self, conn, table_name, schema=None, **kw):
+ schema = ''
+ raw = connection.connect().connection.meta().get_columns(
+ schemaPattern=schema, tableNamePattern=table_name)
+ return [self._map_column(row) for row in raw]
+
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ if schema is None:
+ schema = ''
+ columns = connection.connect().connection.meta().get_columns(
+ schemaPattern=schema, tableNamePattern=table_name, *kw)
+ pk_columns = [col['COLUMN_NAME'] for col in columns if col['KEY_SEQ'] > 0]
+ return {'constrained_columns': pk_columns}
+
+ def get_indexes(self, conn, table_name, schema=None, **kw):
+ '''This information does not seem to be exposed via Avatica
+ TODO: Implement by directly querying SYSTEM tables ? '''
return []
def get_foreign_keys(self, conn, table_name, schema=None, **kw):
+ '''Foreign keys are a foreign concept to Phoenix,
+ but SqlAlchemy cannot parse the DB schema if it's not implemented '''
return []
- # TODO This should be possible to implement
- def get_indexes(self, conn, table_name, schema=None, **kw):
- return []
+ def _map_column(self, raw):
+ cooked = {}
+ cooked['name'] = raw['COLUMN_NAME']
+ cooked['type'] = COLUMN_DATA_TYPE[raw['TYPE_ID']]
+ cooked['nullable'] = bool(raw['IS_NULLABLE'])
+ cooked['autoincrement'] = bool(raw['IS_AUTOINCREMENT'])
+ cooked['comment'] = raw['REMARKS']
+ cooked['default'] = None # Not apparent how to get this from the metatdata
+ return cooked
class TINYINT(types.Integer):
diff --git a/python-phoenixdb/phoenixdb/tests/test_db.py b/python-phoenixdb/phoenixdb/tests/test_db.py
index 04f19d9..da12b23 100644
--- a/python-phoenixdb/phoenixdb/tests/test_db.py
+++ b/python-phoenixdb/phoenixdb/tests/test_db.py
@@ -16,6 +16,7 @@
import unittest
import phoenixdb.cursor
+from phoenixdb.connection import Connection
from phoenixdb.errors import InternalError
from phoenixdb.tests import DatabaseTestCase, TEST_DB_URL
@@ -107,3 +108,121 @@ class PhoenixDatabaseTest(DatabaseTestCase):
self.conn.autocommit = True
cursor.execute("SELECT * FROM test ORDER BY id")
self.assertEqual(cursor.fetchall(), [[1, 'one'], [2, 'two']])
+
+ def test_conn_props(self):
+ phoenix_args, avatica_args = Connection._map_conn_props(
+ {'autoCommit': True,
+ 'readonly': True,
+ 'transactionIsolation': 3,
+ 'schema': 'bubu',
+ 'phoenixArg': 'phoenixArg'})
+ self.assertEqual(phoenix_args, {'phoenixArg': 'phoenixArg'})
+ self.assertEqual(avatica_args, {'autoCommit': True,
+ 'readOnly': True,
+ 'transactionIsolation': 3,
+ 'schema': 'bubu'})
+
+ def test_meta(self):
+ with self.conn.cursor() as cursor:
+ try:
+ cursor.execute('drop table if exists DEFAULT_TABLE')
+ cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+ cursor.execute('drop table if exists B_SCHMEA.B_TABLE')
+
+ cursor.execute('create table DEFAULT_TABLE (ID integer primary key)')
+ cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)')
+ cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)')
+
+ meta = self.conn.meta()
+
+ self.assertEqual(meta.get_catalogs(), [])
+
+ self.assertEqual(meta.get_schemas(), [
+ {'TABLE_SCHEM': '', 'TABLE_CATALOG': ''},
+ {'TABLE_SCHEM': 'A_SCHEMA', 'TABLE_CATALOG': ''},
+ {'TABLE_SCHEM': 'B_SCHEMA', 'TABLE_CATALOG': ''},
+ {'TABLE_SCHEM': 'SYSTEM', 'TABLE_CATALOG': ''}])
+
+ self.assertEqual(meta.get_schemas(schemaPattern=''), [
+ {'TABLE_SCHEM': '', 'TABLE_CATALOG': ''}])
+
+ self.assertEqual(meta.get_schemas(schemaPattern='A_SCHEMA'), [
+ {'TABLE_SCHEM': 'A_SCHEMA', 'TABLE_CATALOG': ''}])
+
+ a_tables = meta.get_tables()
+ self.assertTrue(len(a_tables) > 3) # Don't know how many tables SYSTEM has
+
+ a_tables = meta.get_tables(schemaPattern='')
+ self.assertEqual(len(a_tables), 1)
+ self.assertTrue(a_tables[0]['TABLE_NAME'] == 'DEFAULT_TABLE')
+
+ a_tables = meta.get_tables(schemaPattern='A_SCHEMA')
+ self.assertEqual(len(a_tables), 1)
+ self.assertTrue(a_tables[0]['TABLE_NAME'] == 'A_TABLE')
+
+ a_columns = meta.get_columns(schemaPattern='A_SCHEMA', tableNamePattern='A_TABLE')
+ self.assertEqual(len(a_columns), 1)
+ self.assertTrue(a_columns[0]['COLUMN_NAME'] == 'ID_A')
+
+ self.assertTrue(all(elem in meta.get_table_types() for elem in [
+ {'TABLE_TYPE': 'INDEX'},
+ {'TABLE_TYPE': 'SEQUENCE'},
+ {'TABLE_TYPE': 'SYSTEM TABLE'},
+ {'TABLE_TYPE': 'TABLE'},
+ {'TABLE_TYPE': 'VIEW'}]))
+
+ self.assertEqual(meta.get_type_info(), [])
+ finally:
+ cursor.execute('drop table if exists DEFAULT_TABLE')
+ cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+ cursor.execute('drop table if exists B_SCHEMA.B_TABLE')
+
+ @unittest.skip("https://issues.apache.org/jira/browse/PHOENIX-6004")
+ def test_case_sensitivity(self):
+ with self.conn.cursor() as cursor:
+ try:
+ cursor.execute('drop table if exists AAA')
+ cursor.execute('drop table if exists "aaa"')
+ cursor.execute('drop table if exists "Aaa"')
+
+ cursor.execute('create table AAA (ID integer primary key, YYY integer)')
+ cursor.execute('create table "aaa" ("ID_x" integer primary key, YYY integer, "Yyy" integer, "yyy" integer)')
+ cursor.execute('create table "Aaa" (ID_X integer primary key, ZZZ integer, "Zzz" integer, "zzz" integer)')
+
+ cursor.execute('upsert into AAA values (1, 2)')
+ cursor.execute('upsert into "aaa" values (11, 12, 13, 14)')
+ cursor.execute('upsert into "Aaa" values (21, 22, 23, 24)')
+
+ cursor.execute('select YYY from AAA')
+ self.assertEqual(cursor.fetchone(), [2])
+
+ cursor.execute('select YYY from "aaa"')
+ self.assertEqual(cursor.fetchone(), [12])
+
+ cursor.execute('select "YYY" from "aaa"')
+ self.assertEqual(cursor.fetchone(), [12])
+
+ cursor.execute('select "Yyy" from "aaa"')
+ self.assertEqual(cursor.fetchone(), [13])
+
+ meta = self.conn.meta()
+
+ self.assertEquals(len(meta.get_tables(schemaPattern='')), 3)
+
+ print(meta.get_columns(schemaPattern='',
+ tableNamePattern='"aaa"'))
+
+ self.assertEquals(len(meta.get_tables(schemaPattern='',
+ tableNamePattern='AAA')), 1)
+ self.assertEquals(len(meta.get_tables(schemaPattern='',
+ tableNamePattern='"aaa"')), 1)
+ self.assertEquals(meta.get_columns(tableNamePattern='AAA',
+ columnNamePattern='YYY'), 1)
+ self.assertEquals(meta.get_columns(tableNamePattern='AAA',
+ columnNamePattern='yyy'), 1)
+ self.assertEquals(meta.get_columns(tableNamePattern='AAA',
+ columnNamePattern='"yyy"'), 0)
+ finally:
+ cursor.execute('drop table if exists AAA')
+ cursor.execute('drop table if exists "aaa"')
+ cursor.execute('drop table if exists "Aaa"')
diff --git a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
index 52bff73..99df3be 100644
--- a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
+++ b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
@@ -35,7 +35,7 @@ class SQLAlchemyTest(unittest.TestCase):
engine = self._create_engine()
# connection = engine.connect()
metadata = db.MetaData()
- catalog = db.Table('CATALOG', metadata, autoload=True, autoload_with=engine)
+ catalog = db.Table('CATALOG', metadata, schema='SYSTEM', autoload=True, autoload_with=engine)
self.assertIn('TABLE_NAME', catalog.columns.keys())
def test_textual(self):
@@ -52,6 +52,43 @@ class SQLAlchemyTest(unittest.TestCase):
finally:
connection.execute('drop table if exists ALCHEMY_TEST')
+ def test_schema_filtering(self):
+ engine = self._create_engine()
+ with engine.connect() as connection:
+ try:
+ connection.execute('drop table if exists ALCHEMY_TEST')
+ connection.execute('drop table if exists A.ALCHEMY_TEST_A')
+ connection.execute('drop table if exists B.ALCHEMY_TEST_B')
+
+ connection.execute(text('create table ALCHEMY_TEST (ID integer primary key)'))
+ connection.execute(text('create table A.ALCHEMY_TEST_A (ID_A integer primary key)'))
+ connection.execute(text('create table B.ALCHEMY_TEST_B (ID_B integer primary key)'))
+
+ inspector = db.inspect(engine)
+ print(inspector.default_schema_name)
+
+ self.assertEqual(inspector.get_schema_names(), ['', 'A', 'B', 'SYSTEM'])
+
+ self.assertEqual(inspector.get_table_names(), ['ALCHEMY_TEST'])
+ self.assertEqual(inspector.get_table_names(''), ['ALCHEMY_TEST'])
+ self.assertEqual(inspector.get_table_names('A'), ['ALCHEMY_TEST_A'])
+ self.assertEqual(inspector.get_table_names('B'), ['ALCHEMY_TEST_B'])
+
+ self.assertEqual(inspector.get_columns('ALCHEMY_TEST').pop()['name'], 'ID')
+ self.assertEqual(
+ inspector.get_columns('ALCHEMY_TEST', '').pop()['name'], 'ID')
+ self.assertEqual(
+ inspector.get_columns('ALCHEMY_TEST_A', 'A').pop()['name'], 'ID_A')
+
+ self.assertTrue(engine.has_table('ALCHEMY_TEST'))
+ self.assertFalse(engine.has_table('ALCHEMY_TEST', 'A'))
+ self.assertTrue(engine.has_table('ALCHEMY_TEST_A', 'A'))
+ self.assertFalse(engine.has_table('ALCHEMY_TEST', 'A'))
+ finally:
+ connection.execute('drop table if exists ALCHEMY_TEST')
+ connection.execute('drop table if exists A.ALCHEMY_TEST_A')
+ connection.execute('drop table if exists B.ALCHEMY_TEST_B')
+
def test_reflection(self):
engine = self._create_engine()
with engine.connect() as connection:
@@ -65,7 +102,7 @@ class SQLAlchemyTest(unittest.TestCase):
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city))'''))
- columns_result = inspector.get_columns('us_population')
+ columns_result = inspector.get_columns('US_POPULATION')
self.assertEqual(len(columns_result), 3)
finally:
connection.execute('drop table if exists us_population')