You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2020/07/14 03:37:52 UTC

[phoenix-queryserver] branch master updated: PHOENIX-5994 SqlAlchemy schema filtering incorrect semantics

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-queryserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 104fc64  PHOENIX-5994 SqlAlchemy schema filtering incorrect semantics
104fc64 is described below

commit 104fc646357912613a45942209c877d3b1d80980
Author: Istvan Toth <st...@apache.org>
AuthorDate: Tue Jul 7 13:54:24 2020 +0200

    PHOENIX-5994 SqlAlchemy schema filtering incorrect semantics
    
    Also rewrite metadata and connection property handling
---
 python-phoenixdb/NEWS.rst                          |   3 +
 python-phoenixdb/README.rst                        |   8 +-
 python-phoenixdb/phoenixdb/avatica/client.py       |  80 +++++++++-----
 .../phoenixdb/avatica/proto/__init__.py            |   1 -
 python-phoenixdb/phoenixdb/connection.py           | 100 ++++++++++-------
 python-phoenixdb/phoenixdb/cursor.py               |  14 +--
 python-phoenixdb/phoenixdb/meta.py                 |  96 +++++++++++++++++
 python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py   | 104 ++++++++----------
 python-phoenixdb/phoenixdb/tests/test_db.py        | 119 +++++++++++++++++++++
 .../phoenixdb/tests/test_sqlalchemy.py             |  41 ++++++-
 10 files changed, 429 insertions(+), 137 deletions(-)

diff --git a/python-phoenixdb/NEWS.rst b/python-phoenixdb/NEWS.rst
index db0ab0f..819af84 100644
--- a/python-phoenixdb/NEWS.rst
+++ b/python-phoenixdb/NEWS.rst
@@ -16,6 +16,9 @@ Unreleased
 - Removed shell example, as it was python2 only
 - Updated documentation
 - Added SQLAlchemy dialect
+- Implemented Avatica Metadata API
+- Misc fixes
+- Licensing cleanup
 
 Version 0.7
 -----------
diff --git a/python-phoenixdb/README.rst b/python-phoenixdb/README.rst
index 922d562..a7a32f6 100644
--- a/python-phoenixdb/README.rst
+++ b/python-phoenixdb/README.rst
@@ -60,18 +60,18 @@ necessary requirements::
 You can start a Phoenix QueryServer instance on http://localhost:8765 for testing by running
 the following command in the phoenix-queryserver directory::
 
-    mvn clean verify -am -pl queryserver-it -Dtest=foo \
+    mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
     -Dit.test=QueryServerBasicsIT\#startLocalPQS \
     -Ddo.not.randomize.pqs.port=true -Dstart.unsecure.pqs=true
 
 You can start a secure (https+kerberos) Phoenix QueryServer instance on https://localhost:8765
 for testing by running the following command in the phoenix-queryserver directory::
 
-    mvn clean verify -am -pl queryserver-it -Dtest=foo \
+    mvn clean verify -am -pl phoenix-queryserver-it -Dtest=foo \
     -Dit.test=SecureQueryServerPhoenixDBIT\#startLocalPQS \
     -Ddo.not.randomize.pqs.port=true -Dstart.secure.pqs=true
 
-this will also create a shell script in queryserver-it/target/krb_setup.sh, that you can use to set
+this will also create a shell script in phoenix-queryserver-it/target/krb_setup.sh, that you can use to set
 up the environment for the tests.
 
 If you want to use the library without installing the phoenixdb library, you can use
@@ -119,7 +119,7 @@ environments locally::
 You can also run the test suite from maven as part of the Java build by setting the 
 run.full.python.testsuite property. You DO NOT need to set the PHOENIXDB_* enviroment variables,
 maven will set them up for you. The output of the test run will be saved in
-phoenix-queryserver/queryserver-it/target/python-stdout.log and python-stderr.log::
+phoenix-queryserver/phoenix-queryserver-it/target/python-stdout.log and python-stderr.log::
 
     mvn clean verify -Drun.full.python.testsuite=true
 
diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py
index f899c94..16885bb 100644
--- a/python-phoenixdb/phoenixdb/avatica/client.py
+++ b/python-phoenixdb/phoenixdb/avatica/client.py
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""Implementation of the JSON-over-HTTP RPC protocol used by Avatica."""
+"""Implementation of the PROTOBUF-over-HTTP RPC protocol used by Avatica."""
 
 import logging
 import math
@@ -88,19 +88,6 @@ SQLSTATE_ERROR_CLASSES = [
     ('INT', errors.InternalError),  # Phoenix internal error
 ]
 
-# Relevant properties as defined by https://calcite.apache.org/avatica/docs/client_reference.html
-OPEN_CONNECTION_PROPERTIES = (
-    'avatica_user',  # User for the database connection
-    'avatica_password',  # Password for the user
-    'auth',
-    'authentication',
-    'truststore',
-    'verify',
-    'do_as',
-    'user',
-    'password'
-)
-
 
 def raise_sql_error(code, sqlstate, message):
     for prefix, error_class in SQLSTATE_ERROR_CLASSES:
@@ -239,7 +226,10 @@ class AvaticaClient(object):
     def get_catalogs(self, connection_id):
         request = requests_pb2.CatalogsRequest()
         request.connection_id = connection_id
-        return self._apply(request)
+        response_data = self._apply(request, 'ResultSetResponse')
+        response = responses_pb2.ResultSetResponse()
+        response.ParseFromString(response_data)
+        return response
 
     def get_schemas(self, connection_id, catalog=None, schemaPattern=None):
         request = requests_pb2.SchemasRequest()
@@ -248,7 +238,10 @@ class AvaticaClient(object):
             request.catalog = catalog
         if schemaPattern is not None:
             request.schema_pattern = schemaPattern
-        return self._apply(request)
+        response_data = self._apply(request, 'ResultSetResponse')
+        response = responses_pb2.ResultSetResponse()
+        response.ParseFromString(response_data)
+        return response
 
     def get_tables(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
         request = requests_pb2.TablesRequest()
@@ -260,11 +253,12 @@ class AvaticaClient(object):
         if tableNamePattern is not None:
             request.table_name_pattern = tableNamePattern
         if typeList is not None:
-            request.type_list = typeList
-        if typeList is not None:
             request.type_list.extend(typeList)
         request.has_type_list = typeList is not None
-        return self._apply(request)
+        response_data = self._apply(request, 'ResultSetResponse')
+        response = responses_pb2.ResultSetResponse()
+        response.ParseFromString(response_data)
+        return response
 
     def get_columns(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
         request = requests_pb2.ColumnsRequest()
@@ -277,17 +271,35 @@ class AvaticaClient(object):
             request.table_name_pattern = tableNamePattern
         if columnNamePattern is not None:
             request.column_name_pattern = columnNamePattern
-        return self._apply(request)
+        response_data = self._apply(request, 'ResultSetResponse')
+        response = responses_pb2.ResultSetResponse()
+        response.ParseFromString(response_data)
+        return response
 
     def get_table_types(self, connection_id):
         request = requests_pb2.TableTypesRequest()
         request.connection_id = connection_id
-        return self._apply(request)
+        response_data = self._apply(request, 'ResultSetResponse')
+        response = responses_pb2.ResultSetResponse()
+        response.ParseFromString(response_data)
+        return response
 
     def get_type_info(self, connection_id):
         request = requests_pb2.TypeInfoRequest()
         request.connection_id = connection_id
-        return self._apply(request)
+        response_data = self._apply(request, 'ResultSetResponse')
+        response = responses_pb2.ResultSetResponse()
+        response.ParseFromString(response_data)
+        return response
+
+    def connection_sync_dict(self, connection_id, connProps=None):
+        conn_props = self.connection_sync(connection_id, connProps)
+        return {
+            'autoCommit': conn_props.auto_commit,
+            'readOnly': conn_props.read_only,
+            'transactionIsolation': conn_props.transaction_isolation,
+            'catalog': conn_props.catalog,
+            'schema': conn_props.schema}
 
     def connection_sync(self, connection_id, connProps=None):
         """Synchronizes connection properties with the server.
@@ -301,18 +313,28 @@ class AvaticaClient(object):
         :returns:
             A ``common_pb2.ConnectionProperties`` object.
         """
-        if connProps is None:
-            connProps = {}
+        if connProps:
+            props = connProps.copy()
+        else:
+            props = {}
 
         request = requests_pb2.ConnectionSyncRequest()
         request.connection_id = connection_id
-        request.conn_props.auto_commit = connProps.get('autoCommit', False)
         request.conn_props.has_auto_commit = True
-        request.conn_props.read_only = connProps.get('readOnly', False)
         request.conn_props.has_read_only = True
-        request.conn_props.transaction_isolation = connProps.get('transactionIsolation', 0)
-        request.conn_props.catalog = connProps.get('catalog', '')
-        request.conn_props.schema = connProps.get('schema', '')
+        if 'autoCommit' in props:
+            request.conn_props.auto_commit = props.pop('autoCommit')
+        if 'readOnly' in props:
+            request.conn_props.read_only = props.pop('readOnly')
+        if 'transactionIsolation' in props:
+            request.conn_props.transaction_isolation = props.pop('transactionIsolation', None)
+        if 'catalog' in props:
+            request.conn_props.catalog = props.pop('catalog', None)
+        if 'schema' in props:
+            request.conn_props.schema = props.pop('schema', None)
+
+        if props:
+            logger.warning("Unhandled connection property:" + props)
 
         response_data = self._apply(request)
         response = responses_pb2.ConnectionSyncResponse()
diff --git a/python-phoenixdb/phoenixdb/avatica/proto/__init__.py b/python-phoenixdb/phoenixdb/avatica/proto/__init__.py
index 09697dc..ae1e83e 100644
--- a/python-phoenixdb/phoenixdb/avatica/proto/__init__.py
+++ b/python-phoenixdb/phoenixdb/avatica/proto/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
diff --git a/python-phoenixdb/phoenixdb/connection.py b/python-phoenixdb/phoenixdb/connection.py
index 0930115..f752230 100644
--- a/python-phoenixdb/phoenixdb/connection.py
+++ b/python-phoenixdb/phoenixdb/connection.py
@@ -18,14 +18,17 @@ import uuid
 import weakref
 
 from phoenixdb import errors
-from phoenixdb.avatica.client import OPEN_CONNECTION_PROPERTIES
 from phoenixdb.cursor import Cursor
 from phoenixdb.errors import ProgrammingError
+from phoenixdb.meta import Meta
 
 __all__ = ['Connection']
 
 logger = logging.getLogger(__name__)
 
+AVATICA_PROPERTIES = ('autoCommit', 'autocommit', 'readOnly', 'readonly', 'transactionIsolation',
+                      'catalog', 'schema')
+
 
 class Connection(object):
     """Database connection.
@@ -46,17 +49,11 @@ class Connection(object):
         else:
             self.cursor_factory = Cursor
         self._cursors = []
-        # Extract properties to pass to OpenConnectionRequest
-        self._connection_args = {}
-        # The rest of the kwargs
-        self._filtered_args = {}
-        for k in kwargs:
-            if k in OPEN_CONNECTION_PROPERTIES:
-                self._connection_args[k] = kwargs[k]
-            else:
-                self._filtered_args[k] = kwargs[k]
+        self._phoenix_props, avatica_props_init = Connection._map_conn_props(kwargs)
         self.open()
-        self.set_session(**self._filtered_args)
+
+        # TODO we could probably optimize it away if the defaults are not changed
+        self.set_session(**avatica_props_init)
 
     def __del__(self):
         if not self._closed:
@@ -69,10 +66,36 @@ class Connection(object):
         if not self._closed:
             self.close()
 
+    @staticmethod
+    def _default_avatica_props():
+        return {'autoCommit': False,
+                'readOnly': False,
+                'transactionIsolation': 0,
+                'catalog': '',
+                'schema': ''}
+
+    @staticmethod
+    def _map_conn_props(conn_props):
+        """Sorts and prepocesses args that should be passed to Phoenix and Avatica"""
+
+        avatica_props = dict([(k, conn_props[k]) for k in conn_props.keys() if k in AVATICA_PROPERTIES])
+        phoenix_props = dict([(k, conn_props[k]) for k in conn_props.keys() if k not in AVATICA_PROPERTIES])
+        avatica_props = Connection._map_legacy_avatica_props(avatica_props)
+
+        return (phoenix_props, avatica_props)
+
+    @staticmethod
+    def _map_legacy_avatica_props(props):
+        if 'autocommit' in props:
+            props['autoCommit'] = bool(props.pop('autocommit'))
+        if 'readonly' in props:
+            props['readOnly'] = bool(props.pop('readonly'))
+        return props
+
     def open(self):
         """Opens the connection."""
         self._id = str(uuid.uuid4())
-        self._client.open_connection(self._id, info=self._connection_args)
+        self._client.open_connection(self._id, info=self._phoenix_props)
 
     def close(self):
         """Closes the connection.
@@ -83,7 +106,7 @@ class Connection(object):
         be automatically called at the end of the ``with`` block.
         """
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
+            raise ProgrammingError('The connection is already closed.')
         for cursor_ref in self._cursors:
             cursor = cursor_ref()
             if cursor is not None and not cursor._closed:
@@ -99,12 +122,12 @@ class Connection(object):
 
     def commit(self):
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
+            raise ProgrammingError('The connection is already closed.')
         self._client.commit(self._id)
 
     def rollback(self):
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
+            raise ProgrammingError('The connection is already closed.')
         self._client.rollback(self._id)
 
     def cursor(self, cursor_factory=None):
@@ -121,12 +144,12 @@ class Connection(object):
             A :class:`~phoenixdb.cursor.Cursor` object.
         """
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
+            raise ProgrammingError('The connection is already closed.')
         cursor = (cursor_factory or self.cursor_factory)(self)
         self._cursors.append(weakref.ref(cursor, self._cursors.remove))
         return cursor
 
-    def set_session(self, autocommit=None, readonly=None):
+    def set_session(self, **props):
         """Sets one or more parameters in the current connection.
 
         :param autocommit:
@@ -135,50 +158,51 @@ class Connection(object):
         :param readonly:
             Switch the connection to read-only mode.
         """
-        props = {}
-        if autocommit is not None:
-            props['autoCommit'] = bool(autocommit)
-        if readonly is not None:
-            props['readOnly'] = bool(readonly)
-        props = self._client.connection_sync(self._id, props)
-        self._autocommit = props.auto_commit
-        self._readonly = props.read_only
-        self._transactionisolation = props.transaction_isolation
+        props = Connection._map_legacy_avatica_props(props)
+        self._avatica_props = self._client.connection_sync_dict(self._id, props)
 
     @property
     def autocommit(self):
         """Read/write attribute for switching the connection's autocommit mode."""
-        return self._autocommit
+        return self._avatica_props['autoCommit']
 
     @autocommit.setter
     def autocommit(self, value):
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
-        props = self._client.connection_sync(self._id, {'autoCommit': bool(value)})
-        self._autocommit = props.auto_commit
+            raise ProgrammingError('The connection is already closed.')
+        self._avatica_props = self._client.connection_sync_dict(self._id, {'autoCommit': bool(value)})
 
     @property
     def readonly(self):
         """Read/write attribute for switching the connection's readonly mode."""
-        return self._readonly
+        return self._avatica_props['readOnly']
 
     @readonly.setter
     def readonly(self, value):
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
-        props = self._client.connection_sync(self._id, {'readOnly': bool(value)})
-        self._readonly = props.read_only
+            raise ProgrammingError('The connection is already closed.')
+        self._avatica_props = self._client.connection_sync_dict(self._id, {'readOnly': bool(value)})
 
     @property
     def transactionisolation(self):
-        return self._transactionisolation
+        return self._avatica_props['_transactionIsolation']
 
     @transactionisolation.setter
     def transactionisolation(self, value):
         if self._closed:
-            raise ProgrammingError('the connection is already closed')
-        props = self._client.connection_sync(self._id, {'transactionIsolation': bool(value)})
-        self._transactionisolation = props.transaction_isolation
+            raise ProgrammingError('The connection is already closed.')
+        self._avatica_props = self._client.connection_sync_dict(self._id, {'transactionIsolation': bool(value)})
+
+    def meta(self):
+        """Creates a new meta.
+
+        :returns:
+            A :class:`~phoenixdb.meta` object.
+        """
+        if self._closed:
+            raise ProgrammingError('The connection is already closed.')
+        meta = Meta(self)
+        return meta
 
 
 for name in errors.__all__:
diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py
index 1dac835..5521ebc 100644
--- a/python-phoenixdb/phoenixdb/cursor.py
+++ b/python-phoenixdb/phoenixdb/cursor.py
@@ -162,14 +162,16 @@ class Cursor(object):
             offset=offset, frame_max_size=self.itersize)
         self._set_frame(frame)
 
+    def _process_result(self, result):
+        if result.own_statement:
+            self._set_id(result.statement_id)
+        self._set_signature(result.signature if result.HasField('signature') else None)
+        self._set_frame(result.first_frame if result.HasField('first_frame') else None)
+        self._updatecount = result.update_count
+
     def _process_results(self, results):
         if results:
-            result = results[0]
-            if result.own_statement:
-                self._set_id(result.statement_id)
-            self._set_signature(result.signature if result.HasField('signature') else None)
-            self._set_frame(result.first_frame if result.HasField('first_frame') else None)
-            self._updatecount = result.update_count
+            return self._process_result(results[0])
 
     def _transform_parameters(self, parameters):
         typed_parameters = []
diff --git a/python-phoenixdb/phoenixdb/meta.py b/python-phoenixdb/phoenixdb/meta.py
new file mode 100644
index 0000000..18ad147
--- /dev/null
+++ b/python-phoenixdb/phoenixdb/meta.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import logging
+
+from phoenixdb.errors import ProgrammingError
+from phoenixdb.cursor import DictCursor
+
+
+__all__ = ['Meta']
+
+logger = logging.getLogger(__name__)
+
+
+class Meta(object):
+    """Database meta for querying MetaData
+    """
+
+    def __init__(self, connection):
+        self._connection = connection
+
+    def get_catalogs(self):
+        if self._connection._closed:
+            raise ProgrammingError('The connection is already closed.')
+        result = self._connection._client.get_catalogs(self._connection._id)
+        with DictCursor(self._connection) as cursor:
+            cursor._process_result(result)
+            return cursor.fetchall()
+
+    def get_schemas(self, catalog=None, schemaPattern=None):
+        if self._connection._closed:
+            raise ProgrammingError('The connection is already closed.')
+        result = self._connection._client.get_schemas(self._connection._id, catalog, schemaPattern)
+        with DictCursor(self._connection) as cursor:
+            cursor._process_result(result)
+            return self._fix_default(cursor.fetchall(), schemaPattern=schemaPattern)
+
+    def get_tables(self, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
+        if self._connection._closed:
+            raise ProgrammingError('The connection is already closed.')
+        result = self._connection._client.get_tables(
+            self._connection._id, catalog, schemaPattern, tableNamePattern, typeList=typeList)
+        with DictCursor(self._connection) as cursor:
+            cursor._process_result(result)
+            return self._fix_default(cursor.fetchall(), catalog, schemaPattern)
+
+    def get_columns(self, catalog=None, schemaPattern=None, tableNamePattern=None,
+                    columnNamePattern=None):
+        if self._connection._closed:
+            raise ProgrammingError('The connection is already closed.')
+        result = self._connection._client.get_columns(
+            self._connection._id, catalog, schemaPattern, tableNamePattern, columnNamePattern)
+        with DictCursor(self._connection) as cursor:
+            cursor._process_result(result)
+            return self._fix_default(cursor.fetchall(), catalog, schemaPattern)
+
+    def get_table_types(self):
+        if self._connection._closed:
+            raise ProgrammingError('The connection is already closed.')
+        result = self._connection._client.get_table_types(self._connection._id)
+        with DictCursor(self._connection) as cursor:
+            cursor._process_result(result)
+            return cursor.fetchall()
+
+    def get_type_info(self):
+        if self._connection._closed:
+            raise ProgrammingError('The connection is already closed.')
+        result = self._connection._client.get_type_info(self._connection._id)
+        with DictCursor(self._connection) as cursor:
+            cursor._process_result(result)
+            return cursor.fetchall()
+
+    def _fix_default(self, rows, catalog=None, schemaPattern=None):
+        '''Workaround for PHOENIX-6003'''
+        if schemaPattern == '':
+            rows = [row for row in rows if row['TABLE_SCHEM'] is None]
+        if catalog == '':
+            rows = [row for row in rows if row['TABLE_CATALOG'] is None]
+        # Couldn't find a sane way to do it that works on 2 and 3
+        if sys.version_info.major == 3:
+            return [{k: v or '' for k, v in row.items()} for row in rows]
+        else:
+            return [{k: v or '' for k, v in row.iteritems()} for row in rows]
diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
index ebe8f12..ad75d6d 100644
--- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
+++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
@@ -126,76 +126,66 @@ class PhoenixDialect(DefaultDialect):
         ))
         return [phoenix_url], connect_args
 
-    def has_table(self, connection, table_name, schema=None):
+    def has_table(self, connection, table_name, schema=None, **kw):
         if schema is None:
-            query = "SELECT 1 FROM system.catalog WHERE table_name = ? LIMIT 1"
-            params = [table_name.upper()]
-        else:
-            query = "SELECT 1 FROM system.catalog WHERE table_name = ? AND TABLE_SCHEM = ? LIMIT 1"
-            params = [table_name.upper(), schema.upper()]
-        return connection.execute(query, params).first() is not None
+            schema = ''
+        return bool(connection.connect().connection.meta().get_tables(
+            tableNamePattern=table_name,
+            schemaPattern=schema,
+            typeList=('TABLE', 'SYSTEM_TABLE')))
 
     def get_schema_names(self, connection, **kw):
-        query = "SELECT DISTINCT TABLE_SCHEM FROM SYSTEM.CATALOG"
-        return [row[0] for row in connection.execute(query)]
+        schemas = connection.connect().connection.meta().get_schemas()
+        return [schema['TABLE_SCHEM'] for schema in schemas]
 
-    def get_table_names(self, connection, schema=None, **kw):
+    def get_table_names(self, connection, schema=None, order_by=None, **kw):
+        '''order_by is ignored'''
         if schema is None:
-            query = "SELECT DISTINCT table_name FROM SYSTEM.CATALOG"
-            params = []
-        else:
-            query = "SELECT DISTINCT table_name FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = ? "
-            params = [schema.upper()]
-        return [row[0] for row in connection.execute(query, params)]
+            schema = ''
+        tables = connection.connect().connection.meta().get_tables(
+            schemaPattern=schema, typeList=('TABLE', 'SYSTEM_TABLE'))
+        return [table['TABLE_NAME'] for table in tables]
+
+    def get_view_names(self, connection, schema=None, **kw):
+        if schema is None:
+            schema = ''
+        return connection.connect().connection.meta().get_tables(schemaPattern=schema,
+                                                                 typeList=('VIEW'))
 
     def get_columns(self, connection, table_name, schema=None, **kw):
         if schema is None:
-            query = """SELECT COLUMN_NAME,  DATA_TYPE, NULLABLE
-                    FROM system.catalog
-                    WHERE table_name = ?
-                    AND ORDINAL_POSITION is not null
-                    ORDER BY ORDINAL_POSITION"""
-            params = [table_name.upper()]
-        else:
-            query = """SELECT COLUMN_NAME, DATA_TYPE, NULLABLE
-                    FROM system.catalog
-                    WHERE TABLE_SCHEM = ?
-                    AND table_name = ?
-                    AND ORDINAL_POSITION is not null
-                    ORDER BY ORDINAL_POSITION"""
-            params = [schema.upper(), table_name.upper()]
-
-        # get all of the fields for this table
-        c = connection.execute(query, params)
-        cols = []
-        while True:
-            row = c.fetchone()
-            if row is None:
-                break
-            name = row[0]
-            col_type = COLUMN_DATA_TYPE[row[1]]
-            nullable = row[2] == 1 if True else False
-
-            col_d = {
-                'name': name,
-                'type': col_type,
-                'nullable': nullable,
-                'default': None
-            }
-
-            cols.append(col_d)
-        return cols
-
-    # TODO This should be possible to implement
-    def get_pk_constraint(self, conn, table_name, schema=None, **kw):
+            schema = ''
+        raw = connection.connect().connection.meta().get_columns(
+            schemaPattern=schema, tableNamePattern=table_name)
+        return [self._map_column(row) for row in raw]
+
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+        if schema is None:
+            schema = ''
+        columns = connection.connect().connection.meta().get_columns(
+            schemaPattern=schema, tableNamePattern=table_name, *kw)
+        pk_columns = [col['COLUMN_NAME'] for col in columns if col['KEY_SEQ'] > 0]
+        return {'constrained_columns': pk_columns}
+
+    def get_indexes(self, conn, table_name, schema=None, **kw):
+        '''This information does not seem to be exposed via Avatica
+        TODO: Implement by directly querying SYSTEM tables ? '''
         return []
 
     def get_foreign_keys(self, conn, table_name, schema=None, **kw):
+        '''Foreign keys are a foreign concept to Phoenix,
+        but SqlAlchemy cannot parse the DB schema if it's not implemented '''
         return []
 
-    # TODO This should be possible to implement
-    def get_indexes(self, conn, table_name, schema=None, **kw):
-        return []
+    def _map_column(self, raw):
+        cooked = {}
+        cooked['name'] = raw['COLUMN_NAME']
+        cooked['type'] = COLUMN_DATA_TYPE[raw['TYPE_ID']]
+        cooked['nullable'] = bool(raw['IS_NULLABLE'])
+        cooked['autoincrement'] = bool(raw['IS_AUTOINCREMENT'])
+        cooked['comment'] = raw['REMARKS']
+        cooked['default'] = None  # Not apparent how to get this from the metatdata
+        return cooked
 
 
 class TINYINT(types.Integer):
diff --git a/python-phoenixdb/phoenixdb/tests/test_db.py b/python-phoenixdb/phoenixdb/tests/test_db.py
index 04f19d9..da12b23 100644
--- a/python-phoenixdb/phoenixdb/tests/test_db.py
+++ b/python-phoenixdb/phoenixdb/tests/test_db.py
@@ -16,6 +16,7 @@
 import unittest
 
 import phoenixdb.cursor
+from phoenixdb.connection import Connection
 from phoenixdb.errors import InternalError
 from phoenixdb.tests import DatabaseTestCase, TEST_DB_URL
 
@@ -107,3 +108,121 @@ class PhoenixDatabaseTest(DatabaseTestCase):
             self.conn.autocommit = True
             cursor.execute("SELECT * FROM test ORDER BY id")
             self.assertEqual(cursor.fetchall(), [[1, 'one'], [2, 'two']])
+
+    def test_conn_props(self):
+        phoenix_args, avatica_args = Connection._map_conn_props(
+            {'autoCommit': True,
+             'readonly': True,
+             'transactionIsolation': 3,
+             'schema': 'bubu',
+             'phoenixArg': 'phoenixArg'})
+        self.assertEqual(phoenix_args, {'phoenixArg': 'phoenixArg'})
+        self.assertEqual(avatica_args, {'autoCommit': True,
+                                        'readOnly': True,
+                                        'transactionIsolation': 3,
+                                        'schema': 'bubu'})
+
+    def test_meta(self):
+        with self.conn.cursor() as cursor:
+            try:
+                cursor.execute('drop table if exists DEFAULT_TABLE')
+                cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+                cursor.execute('drop table if exists B_SCHMEA.B_TABLE')
+
+                cursor.execute('create table DEFAULT_TABLE (ID integer primary key)')
+                cursor.execute('create table A_SCHEMA.A_TABLE (ID_A integer primary key)')
+                cursor.execute('create table B_SCHEMA.B_TABLE (ID_B integer primary key)')
+
+                meta = self.conn.meta()
+
+                self.assertEqual(meta.get_catalogs(), [])
+
+                self.assertEqual(meta.get_schemas(), [
+                    {'TABLE_SCHEM': '', 'TABLE_CATALOG': ''},
+                    {'TABLE_SCHEM': 'A_SCHEMA', 'TABLE_CATALOG': ''},
+                    {'TABLE_SCHEM': 'B_SCHEMA', 'TABLE_CATALOG': ''},
+                    {'TABLE_SCHEM': 'SYSTEM', 'TABLE_CATALOG': ''}])
+
+                self.assertEqual(meta.get_schemas(schemaPattern=''), [
+                    {'TABLE_SCHEM': '', 'TABLE_CATALOG': ''}])
+
+                self.assertEqual(meta.get_schemas(schemaPattern='A_SCHEMA'), [
+                    {'TABLE_SCHEM': 'A_SCHEMA', 'TABLE_CATALOG': ''}])
+
+                a_tables = meta.get_tables()
+                self.assertTrue(len(a_tables) > 3)  # Don't know how many tables SYSTEM has
+
+                a_tables = meta.get_tables(schemaPattern='')
+                self.assertEqual(len(a_tables), 1)
+                self.assertTrue(a_tables[0]['TABLE_NAME'] == 'DEFAULT_TABLE')
+
+                a_tables = meta.get_tables(schemaPattern='A_SCHEMA')
+                self.assertEqual(len(a_tables), 1)
+                self.assertTrue(a_tables[0]['TABLE_NAME'] == 'A_TABLE')
+
+                a_columns = meta.get_columns(schemaPattern='A_SCHEMA', tableNamePattern='A_TABLE')
+                self.assertEqual(len(a_columns), 1)
+                self.assertTrue(a_columns[0]['COLUMN_NAME'] == 'ID_A')
+
+                self.assertTrue(all(elem in meta.get_table_types() for elem in [
+                    {'TABLE_TYPE': 'INDEX'},
+                    {'TABLE_TYPE': 'SEQUENCE'},
+                    {'TABLE_TYPE': 'SYSTEM TABLE'},
+                    {'TABLE_TYPE': 'TABLE'},
+                    {'TABLE_TYPE': 'VIEW'}]))
+
+                self.assertEqual(meta.get_type_info(), [])
+            finally:
+                cursor.execute('drop table if exists DEFAULT_TABLE')
+                cursor.execute('drop table if exists A_SCHEMA.A_TABLE')
+                cursor.execute('drop table if exists B_SCHEMA.B_TABLE')
+
+    @unittest.skip("https://issues.apache.org/jira/browse/PHOENIX-6004")
+    def test_case_sensitivity(self):
+        with self.conn.cursor() as cursor:
+            try:
+                cursor.execute('drop table if exists AAA')
+                cursor.execute('drop table if exists "aaa"')
+                cursor.execute('drop table if exists "Aaa"')
+
+                cursor.execute('create table AAA (ID integer primary key, YYY integer)')
+                cursor.execute('create table "aaa" ("ID_x" integer primary key, YYY integer, "Yyy" integer, "yyy" integer)')
+                cursor.execute('create table "Aaa" (ID_X integer primary key, ZZZ integer, "Zzz" integer, "zzz" integer)')
+
+                cursor.execute('upsert into AAA values (1, 2)')
+                cursor.execute('upsert into "aaa" values (11, 12, 13, 14)')
+                cursor.execute('upsert into "Aaa" values (21, 22, 23, 24)')
+
+                cursor.execute('select YYY from AAA')
+                self.assertEqual(cursor.fetchone(), [2])
+
+                cursor.execute('select YYY from "aaa"')
+                self.assertEqual(cursor.fetchone(), [12])
+
+                cursor.execute('select "YYY" from "aaa"')
+                self.assertEqual(cursor.fetchone(), [12])
+
+                cursor.execute('select "Yyy" from "aaa"')
+                self.assertEqual(cursor.fetchone(), [13])
+
+                meta = self.conn.meta()
+
+                self.assertEquals(len(meta.get_tables(schemaPattern='')), 3)
+
+                print(meta.get_columns(schemaPattern='',
+                                       tableNamePattern='"aaa"'))
+
+                self.assertEquals(len(meta.get_tables(schemaPattern='',
+                                                      tableNamePattern='AAA')), 1)
+                self.assertEquals(len(meta.get_tables(schemaPattern='',
+                                                      tableNamePattern='"aaa"')), 1)
+                self.assertEquals(meta.get_columns(tableNamePattern='AAA',
+                                                   columnNamePattern='YYY'), 1)
+                self.assertEquals(meta.get_columns(tableNamePattern='AAA',
+                                                   columnNamePattern='yyy'), 1)
+                self.assertEquals(meta.get_columns(tableNamePattern='AAA',
+                                                   columnNamePattern='"yyy"'), 0)
+            finally:
+                cursor.execute('drop table if exists AAA')
+                cursor.execute('drop table if exists "aaa"')
+                cursor.execute('drop table if exists "Aaa"')
diff --git a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
index 52bff73..99df3be 100644
--- a/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
+++ b/python-phoenixdb/phoenixdb/tests/test_sqlalchemy.py
@@ -35,7 +35,7 @@ class SQLAlchemyTest(unittest.TestCase):
         engine = self._create_engine()
         # connection = engine.connect()
         metadata = db.MetaData()
-        catalog = db.Table('CATALOG', metadata, autoload=True, autoload_with=engine)
+        catalog = db.Table('CATALOG', metadata, schema='SYSTEM', autoload=True, autoload_with=engine)
         self.assertIn('TABLE_NAME', catalog.columns.keys())
 
     def test_textual(self):
@@ -52,6 +52,43 @@ class SQLAlchemyTest(unittest.TestCase):
             finally:
                 connection.execute('drop table if exists ALCHEMY_TEST')
 
+    def test_schema_filtering(self):
+        engine = self._create_engine()
+        with engine.connect() as connection:
+            try:
+                connection.execute('drop table if exists ALCHEMY_TEST')
+                connection.execute('drop table if exists A.ALCHEMY_TEST_A')
+                connection.execute('drop table if exists B.ALCHEMY_TEST_B')
+
+                connection.execute(text('create table ALCHEMY_TEST (ID integer primary key)'))
+                connection.execute(text('create table A.ALCHEMY_TEST_A (ID_A integer primary key)'))
+                connection.execute(text('create table B.ALCHEMY_TEST_B (ID_B integer primary key)'))
+
+                inspector = db.inspect(engine)
+                print(inspector.default_schema_name)
+
+                self.assertEqual(inspector.get_schema_names(), ['', 'A', 'B', 'SYSTEM'])
+
+                self.assertEqual(inspector.get_table_names(), ['ALCHEMY_TEST'])
+                self.assertEqual(inspector.get_table_names(''), ['ALCHEMY_TEST'])
+                self.assertEqual(inspector.get_table_names('A'), ['ALCHEMY_TEST_A'])
+                self.assertEqual(inspector.get_table_names('B'), ['ALCHEMY_TEST_B'])
+
+                self.assertEqual(inspector.get_columns('ALCHEMY_TEST').pop()['name'], 'ID')
+                self.assertEqual(
+                    inspector.get_columns('ALCHEMY_TEST', '').pop()['name'], 'ID')
+                self.assertEqual(
+                    inspector.get_columns('ALCHEMY_TEST_A', 'A').pop()['name'], 'ID_A')
+
+                self.assertTrue(engine.has_table('ALCHEMY_TEST'))
+                self.assertFalse(engine.has_table('ALCHEMY_TEST', 'A'))
+                self.assertTrue(engine.has_table('ALCHEMY_TEST_A', 'A'))
+                self.assertFalse(engine.has_table('ALCHEMY_TEST', 'A'))
+            finally:
+                connection.execute('drop table if exists ALCHEMY_TEST')
+                connection.execute('drop table if exists A.ALCHEMY_TEST_A')
+                connection.execute('drop table if exists B.ALCHEMY_TEST_B')
+
     def test_reflection(self):
         engine = self._create_engine()
         with engine.connect() as connection:
@@ -65,7 +102,7 @@ class SQLAlchemyTest(unittest.TestCase):
                 city VARCHAR NOT NULL,
                 population BIGINT
                 CONSTRAINT my_pk PRIMARY KEY (state, city))'''))
-                columns_result = inspector.get_columns('us_population')
+                columns_result = inspector.get_columns('US_POPULATION')
                 self.assertEqual(len(columns_result), 3)
             finally:
                 connection.execute('drop table if exists us_population')