You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2018/03/21 20:14:03 UTC
[12/30] phoenix git commit: PHOENIX-4636 Add the python PQS driver
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/cursor.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/cursor.py b/python/phoenixdb/cursor.py
new file mode 100644
index 0000000..8be7bed
--- /dev/null
+++ b/python/phoenixdb/cursor.py
@@ -0,0 +1,347 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import collections
+from phoenixdb.types import TypeHelper
+from phoenixdb.errors import ProgrammingError, InternalError
+from phoenixdb.avatica.proto import common_pb2
+
+__all__ = ['Cursor', 'ColumnDescription', 'DictCursor']
+
+logger = logging.getLogger(__name__)
+
+# TODO see note in Cursor.rowcount()
+MAX_INT = 2 ** 64 - 1
+
+ColumnDescription = collections.namedtuple('ColumnDescription', 'name type_code display_size internal_size precision scale null_ok')
+"""Named tuple for representing results from :attr:`Cursor.description`."""
+
+
+class Cursor(object):
+ """Database cursor for executing queries and iterating over results.
+
+ You should not construct this object manually, use :meth:`Connection.cursor() <phoenixdb.connection.Connection.cursor>` instead.
+ """
+
+ arraysize = 1
+ """
+ Read/write attribute specifying the number of rows to fetch
+ at a time with :meth:`fetchmany`. It defaults to 1 meaning to
+ fetch a single row at a time.
+ """
+
+ itersize = 2000
+ """
+ Read/write attribute specifying the number of rows to fetch
+ from the backend at each network roundtrip during iteration
+ on the cursor. The default is 2000.
+ """
+
+ def __init__(self, connection, id=None):
+ self._connection = connection
+ self._id = id
+ self._signature = None
+ self._column_data_types = []
+ self._frame = None
+ self._pos = None
+ self._closed = False
+ self.arraysize = self.__class__.arraysize
+ self.itersize = self.__class__.itersize
+ self._updatecount = -1
+
+ def __del__(self):
+ if not self._connection._closed and not self._closed:
+ self.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if not self._closed:
+ self.close()
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ row = self.fetchone()
+ if row is None:
+ raise StopIteration
+ return row
+
+ next = __next__
+
+ def close(self):
+ """Closes the cursor.
+ No further operations are allowed once the cursor is closed.
+
+ If the cursor is used in a ``with`` statement, this method will
+ be automatically called at the end of the ``with`` block.
+ """
+ if self._closed:
+ raise ProgrammingError('the cursor is already closed')
+ if self._id is not None:
+ self._connection._client.close_statement(self._connection._id, self._id)
+ self._id = None
+ self._signature = None
+ self._column_data_types = []
+ self._frame = None
+ self._pos = None
+ self._closed = True
+
+ @property
+ def closed(self):
+ """Read-only attribute specifying if the cursor is closed or not."""
+ return self._closed
+
+ @property
+ def description(self):
+ if self._signature is None:
+ return None
+ description = []
+ for column in self._signature.columns:
+ description.append(ColumnDescription(
+ column.column_name,
+ column.type.name,
+ column.display_size,
+ None,
+ column.precision,
+ column.scale,
+ None if column.nullable == 2 else bool(column.nullable),
+ ))
+ return description
+
+ def _set_id(self, id):
+ if self._id is not None and self._id != id:
+ self._connection._client.close_statement(self._connection._id, self._id)
+ self._id = id
+
+ def _set_signature(self, signature):
+ self._signature = signature
+ self._column_data_types = []
+ self._parameter_data_types = []
+ if signature is None:
+ return
+
+ for column in signature.columns:
+ dtype = TypeHelper.from_class(column.column_class_name)
+ self._column_data_types.append(dtype)
+
+ for parameter in signature.parameters:
+ dtype = TypeHelper.from_class(parameter.class_name)
+ self._parameter_data_types.append(dtype)
+
+ def _set_frame(self, frame):
+ self._frame = frame
+ self._pos = None
+
+ if frame is not None:
+ if frame.rows:
+ self._pos = 0
+ elif not frame.done:
+ raise InternalError('got an empty frame, but the statement is not done yet')
+
+ def _fetch_next_frame(self):
+ offset = self._frame.offset + len(self._frame.rows)
+ frame = self._connection._client.fetch(
+ self._connection._id, self._id,
+ offset=offset, frame_max_size=self.itersize)
+ self._set_frame(frame)
+
+ def _process_results(self, results):
+ if results:
+ result = results[0]
+ if result.own_statement:
+ self._set_id(result.statement_id)
+ self._set_signature(result.signature if result.HasField('signature') else None)
+ self._set_frame(result.first_frame if result.HasField('first_frame') else None)
+ self._updatecount = result.update_count
+
+ def _transform_parameters(self, parameters):
+ typed_parameters = []
+ for value, data_type in zip(parameters, self._parameter_data_types):
+ field_name, rep, mutate_to, cast_from = data_type
+ typed_value = common_pb2.TypedValue()
+
+ if value is None:
+ typed_value.null = True
+ typed_value.type = common_pb2.NULL
+ else:
+ typed_value.null = False
+
+ # use the mutator function
+ if mutate_to is not None:
+ value = mutate_to(value)
+
+ typed_value.type = rep
+ setattr(typed_value, field_name, value)
+
+ typed_parameters.append(typed_value)
+ return typed_parameters
+
+ def execute(self, operation, parameters=None):
+ if self._closed:
+ raise ProgrammingError('the cursor is already closed')
+ self._updatecount = -1
+ self._set_frame(None)
+ if parameters is None:
+ if self._id is None:
+ self._set_id(self._connection._client.create_statement(self._connection._id))
+ results = self._connection._client.prepare_and_execute(
+ self._connection._id, self._id,
+ operation, first_frame_max_size=self.itersize)
+ self._process_results(results)
+ else:
+ statement = self._connection._client.prepare(
+ self._connection._id, operation)
+ self._set_id(statement.id)
+ self._set_signature(statement.signature)
+
+ results = self._connection._client.execute(
+ self._connection._id, self._id,
+ statement.signature, self._transform_parameters(parameters),
+ first_frame_max_size=self.itersize)
+ self._process_results(results)
+
+ def executemany(self, operation, seq_of_parameters):
+ if self._closed:
+ raise ProgrammingError('the cursor is already closed')
+ self._updatecount = -1
+ self._set_frame(None)
+ statement = self._connection._client.prepare(
+ self._connection._id, operation, max_rows_total=0)
+ self._set_id(statement.id)
+ self._set_signature(statement.signature)
+ for parameters in seq_of_parameters:
+ self._connection._client.execute(
+ self._connection._id, self._id,
+ statement.signature, self._transform_parameters(parameters),
+ first_frame_max_size=0)
+
+ def _transform_row(self, row):
+ """Transforms a Row into Python values.
+
+ :param row:
+ A ``common_pb2.Row`` object.
+
+ :returns:
+ A list of values casted into the correct Python types.
+
+ :raises:
+ NotImplementedError
+ """
+ tmp_row = []
+
+ for i, column in enumerate(row.value):
+ if column.has_array_value:
+ raise NotImplementedError('array types are not supported')
+ elif column.scalar_value.null:
+ tmp_row.append(None)
+ else:
+ field_name, rep, mutate_to, cast_from = self._column_data_types[i]
+
+ # get the value from the field_name
+ value = getattr(column.scalar_value, field_name)
+
+ # cast the value
+ if cast_from is not None:
+ value = cast_from(value)
+
+ tmp_row.append(value)
+ return tmp_row
+
+ def fetchone(self):
+ if self._frame is None:
+ raise ProgrammingError('no select statement was executed')
+ if self._pos is None:
+ return None
+ rows = self._frame.rows
+ row = self._transform_row(rows[self._pos])
+ self._pos += 1
+ if self._pos >= len(rows):
+ self._pos = None
+ if not self._frame.done:
+ self._fetch_next_frame()
+ return row
+
+ def fetchmany(self, size=None):
+ if size is None:
+ size = self.arraysize
+ rows = []
+ while size > 0:
+ row = self.fetchone()
+ if row is None:
+ break
+ rows.append(row)
+ size -= 1
+ return rows
+
+ def fetchall(self):
+ rows = []
+ while True:
+ row = self.fetchone()
+ if row is None:
+ break
+ rows.append(row)
+ return rows
+
+ def setinputsizes(self, sizes):
+ pass
+
+ def setoutputsize(self, size, column=None):
+ pass
+
+ @property
+ def connection(self):
+ """Read-only attribute providing access to the :class:`Connection <phoenixdb.connection.Connection>`
+ object this cursor was created from."""
+ return self._connection
+
+ @property
+ def rowcount(self):
+ """Read-only attribute specifying the number of rows affected by
+ the last executed DML statement or -1 if the number cannot be
+ determined. Note that this will always be set to -1 for select
+ queries."""
+ # TODO instead of -1, this ends up being set to Integer.MAX_VALUE
+ if self._updatecount == MAX_INT:
+ return -1
+ return self._updatecount
+
+ @property
+ def rownumber(self):
+ """Read-only attribute providing the current 0-based index of the
+ cursor in the result set or ``None`` if the index cannot be
+ determined.
+
+ The index can be seen as index of the cursor in a sequence
+ (the result set). The next fetch operation will fetch the
+ row indexed by :attr:`rownumber` in that sequence.
+ """
+ if self._frame is not None and self._pos is not None:
+ return self._frame.offset + self._pos
+ return self._pos
+
+
+class DictCursor(Cursor):
+ """A cursor which returns results as a dictionary"""
+
+ def _transform_row(self, row):
+ row = super(DictCursor, self)._transform_row(row)
+ d = {}
+ for ind, val in enumerate(row):
+ d[self._signature.columns[ind].column_name] = val
+ return d
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/errors.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/errors.py b/python/phoenixdb/errors.py
new file mode 100644
index 0000000..a046c0d
--- /dev/null
+++ b/python/phoenixdb/errors.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+__all__ = [
+ 'Warning', 'Error', 'InterfaceError', 'DatabaseError', 'DataError',
+ 'OperationalError', 'IntegrityError', 'InternalError',
+ 'ProgrammingError', 'NotSupportedError',
+]
+
+try:
+ _StandardError = StandardError
+except NameError:
+ _StandardError = Exception
+
+
+class Warning(_StandardError):
+ """Not used by this package, only defined for compatibility
+ with DB API 2.0."""
+
+
+class Error(_StandardError):
+ """Exception that is the base class of all other error exceptions.
+ You can use this to catch all errors with one single except statement."""
+
+ def __init__(self, message, code=None, sqlstate=None, cause=None):
+ super(_StandardError, self).__init__(message, code, sqlstate, cause)
+
+ @property
+ def message(self):
+ return self.args[0]
+
+ @property
+ def code(self):
+ return self.args[1]
+
+ @property
+ def sqlstate(self):
+ return self.args[2]
+
+ @property
+ def cause(self):
+ return self.args[3]
+
+
+class InterfaceError(Error):
+ """Exception raised for errors that are related to the database
+ interface rather than the database itself."""
+
+
+class DatabaseError(Error):
+ """Exception raised for errors that are related to the database."""
+
+
+class DataError(DatabaseError):
+ """Exception raised for errors that are due to problems with the
+ processed data like division by zero, numeric value out of range,
+ etc."""
+
+
+class OperationalError(DatabaseError):
+ """Raised for errors that are related to the database's operation and not
+ necessarily under the control of the programmer, e.g. an unexpected
+ disconnect occurs, the data source name is not found, a transaction could
+ not be processed, a memory allocation error occurred during
+ processing, etc."""
+
+
+class IntegrityError(DatabaseError):
+ """Raised when the relational integrity of the database is affected, e.g. a foreign key check fails."""
+
+
+class InternalError(DatabaseError):
+ """Raised when the database encounters an internal problem."""
+
+
+class ProgrammingError(DatabaseError):
+ """Raises for programming errors, e.g. table not found, syntax error, etc."""
+
+
+class NotSupportedError(DatabaseError):
+ """Raised when using an API that is not supported by the database."""
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/__init__.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/__init__.py b/python/phoenixdb/tests/__init__.py
new file mode 100644
index 0000000..ec9a79b
--- /dev/null
+++ b/python/phoenixdb/tests/__init__.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import unittest
+import phoenixdb
+
+TEST_DB_URL = os.environ.get('PHOENIXDB_TEST_DB_URL')
+
+
+@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
+class DatabaseTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.conn = phoenixdb.connect(TEST_DB_URL, autocommit=True)
+ self.cleanup_tables = []
+
+ def tearDown(self):
+ self.doCleanups()
+ self.conn.close()
+
+ def addTableCleanup(self, name):
+ def dropTable():
+ with self.conn.cursor() as cursor:
+ cursor.execute("DROP TABLE IF EXISTS {}".format(name))
+ self.addCleanup(dropTable)
+
+ def createTable(self, name, columns):
+ with self.conn.cursor() as cursor:
+ cursor.execute("DROP TABLE IF EXISTS {}".format(name))
+ cursor.execute("CREATE TABLE {} ({})".format(name, columns))
+ self.addTableCleanup(name)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/dbapi20.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/dbapi20.py b/python/phoenixdb/tests/dbapi20.py
new file mode 100644
index 0000000..f176400
--- /dev/null
+++ b/python/phoenixdb/tests/dbapi20.py
@@ -0,0 +1,857 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+''' Python DB API 2.0 driver compliance unit test suite.
+
+ This software is Public Domain and may be used without restrictions.
+
+ "Now we have booze and barflies entering the discussion, plus rumours of
+ DBAs on drugs... and I won't tell you what flashes through my mind each
+ time I read the subject line with 'Anal Compliance' in it. All around
+ this is turning out to be a thoroughly unwholesome unit test."
+
+ -- Ian Bicking
+'''
+
+__version__ = '1.14.3'
+
+import unittest
+import time
+import sys
+
+if sys.version[0] >= '3': #python 3.x
+ _BaseException = Exception
+ def _failUnless(self, expr, msg=None):
+ self.assertTrue(expr, msg)
+else: #python 2.x
+ from exceptions import StandardError as _BaseException
+ def _failUnless(self, expr, msg=None):
+ self.failUnless(expr, msg) ## deprecated since Python 2.6
+
+def str2bytes(sval):
+ if sys.version_info < (3,0) and isinstance(sval, str):
+ sval = sval.decode("latin1")
+ return sval.encode("latin1") #python 3 make unicode into bytes
+
+class DatabaseAPI20Test(unittest.TestCase):
+ ''' Test a database self.driver for DB API 2.0 compatibility.
+ This implementation tests Gadfly, but the TestCase
+ is structured so that other self.drivers can subclass this
+ test case to ensure compiliance with the DB-API. It is
+ expected that this TestCase may be expanded in the future
+ if ambiguities or edge conditions are discovered.
+
+ The 'Optional Extensions' are not yet being tested.
+
+ self.drivers should subclass this test, overriding setUp, tearDown,
+ self.driver, connect_args and connect_kw_args. Class specification
+ should be as follows:
+
+ import dbapi20
+ class mytest(dbapi20.DatabaseAPI20Test):
+ [...]
+
+ Don't 'import DatabaseAPI20Test from dbapi20', or you will
+ confuse the unit tester - just 'import dbapi20'.
+ '''
+
+ # The self.driver module. This should be the module where the 'connect'
+ # method is to be found
+ driver = None
+ connect_args = () # List of arguments to pass to connect
+ connect_kw_args = {} # Keyword arguments for connect
+ table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
+
+ ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
+ ddl2 = 'create table %sbarflys (name varchar(20), drink varchar(30))' % table_prefix
+ xddl1 = 'drop table %sbooze' % table_prefix
+ xddl2 = 'drop table %sbarflys' % table_prefix
+ insert = 'insert'
+
+ lowerfunc = 'lower' # Name of stored procedure to convert string->lowercase
+
+ # Some drivers may need to override these helpers, for example adding
+ # a 'commit' after the execute.
+ def executeDDL1(self,cursor):
+ cursor.execute(self.ddl1)
+
+ def executeDDL2(self,cursor):
+ cursor.execute(self.ddl2)
+
+ def setUp(self):
+ ''' self.drivers should override this method to perform required setup
+ if any is necessary, such as creating the database.
+ '''
+ pass
+
+ def tearDown(self):
+ ''' self.drivers should override this method to perform required cleanup
+ if any is necessary, such as deleting the test database.
+ The default drops the tables that may be created.
+ '''
+ try:
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ for ddl in (self.xddl1,self.xddl2):
+ try:
+ cur.execute(ddl)
+ con.commit()
+ except self.driver.Error:
+ # Assume table didn't exist. Other tests will check if
+ # execute is busted.
+ pass
+ finally:
+ con.close()
+ except _BaseException:
+ pass
+
+ def _connect(self):
+ try:
+ r = self.driver.connect(
+ *self.connect_args,**self.connect_kw_args
+ )
+ except AttributeError:
+ self.fail("No connect method found in self.driver module")
+ return r
+
+ def test_connect(self):
+ con = self._connect()
+ con.close()
+
+ def test_apilevel(self):
+ try:
+ # Must exist
+ apilevel = self.driver.apilevel
+ # Must equal 2.0
+ self.assertEqual(apilevel,'2.0')
+ except AttributeError:
+ self.fail("Driver doesn't define apilevel")
+
+ def test_threadsafety(self):
+ try:
+ # Must exist
+ threadsafety = self.driver.threadsafety
+ # Must be a valid value
+ _failUnless(self, threadsafety in (0,1,2,3))
+ except AttributeError:
+ self.fail("Driver doesn't define threadsafety")
+
+ def test_paramstyle(self):
+ try:
+ # Must exist
+ paramstyle = self.driver.paramstyle
+ # Must be a valid value
+ _failUnless(self, paramstyle in (
+ 'qmark','numeric','named','format','pyformat'
+ ))
+ except AttributeError:
+ self.fail("Driver doesn't define paramstyle")
+
+ def test_Exceptions(self):
+ # Make sure required exceptions exist, and are in the
+ # defined heirarchy.
+ if sys.version[0] == '3': #under Python 3 StardardError no longer exists
+ self.assertTrue(issubclass(self.driver.Warning,Exception))
+ self.assertTrue(issubclass(self.driver.Error,Exception))
+ else:
+ self.failUnless(issubclass(self.driver.Warning,StandardError))
+ self.failUnless(issubclass(self.driver.Error,StandardError))
+
+ _failUnless(self,
+ issubclass(self.driver.InterfaceError,self.driver.Error)
+ )
+ _failUnless(self,
+ issubclass(self.driver.DatabaseError,self.driver.Error)
+ )
+ _failUnless(self,
+ issubclass(self.driver.OperationalError,self.driver.Error)
+ )
+ _failUnless(self,
+ issubclass(self.driver.IntegrityError,self.driver.Error)
+ )
+ _failUnless(self,
+ issubclass(self.driver.InternalError,self.driver.Error)
+ )
+ _failUnless(self,
+ issubclass(self.driver.ProgrammingError,self.driver.Error)
+ )
+ _failUnless(self,
+ issubclass(self.driver.NotSupportedError,self.driver.Error)
+ )
+
+ def test_ExceptionsAsConnectionAttributes(self):
+ # OPTIONAL EXTENSION
+ # Test for the optional DB API 2.0 extension, where the exceptions
+ # are exposed as attributes on the Connection object
+ # I figure this optional extension will be implemented by any
+ # driver author who is using this test suite, so it is enabled
+ # by default.
+ con = self._connect()
+ drv = self.driver
+ _failUnless(self,con.Warning is drv.Warning)
+ _failUnless(self,con.Error is drv.Error)
+ _failUnless(self,con.InterfaceError is drv.InterfaceError)
+ _failUnless(self,con.DatabaseError is drv.DatabaseError)
+ _failUnless(self,con.OperationalError is drv.OperationalError)
+ _failUnless(self,con.IntegrityError is drv.IntegrityError)
+ _failUnless(self,con.InternalError is drv.InternalError)
+ _failUnless(self,con.ProgrammingError is drv.ProgrammingError)
+ _failUnless(self,con.NotSupportedError is drv.NotSupportedError)
+
+
+ def test_commit(self):
+ con = self._connect()
+ try:
+ # Commit must work, even if it doesn't do anything
+ con.commit()
+ finally:
+ con.close()
+
+ def test_rollback(self):
+ con = self._connect()
+ # If rollback is defined, it should either work or throw
+ # the documented exception
+ if hasattr(con,'rollback'):
+ try:
+ con.rollback()
+ except self.driver.NotSupportedError:
+ pass
+
+ def test_cursor(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ finally:
+ con.close()
+
+ def test_cursor_isolation(self):
+ con = self._connect()
+ try:
+ # Make sure cursors created from the same connection have
+ # the documented transaction isolation level
+ cur1 = con.cursor()
+ cur2 = con.cursor()
+ self.executeDDL1(cur1)
+ cur1.execute("%s into %sbooze values ('Victoria Bitter')" % (
+ self.insert, self.table_prefix
+ ))
+ cur2.execute("select name from %sbooze" % self.table_prefix)
+ booze = cur2.fetchall()
+ self.assertEqual(len(booze),1)
+ self.assertEqual(len(booze[0]),1)
+ self.assertEqual(booze[0][0],'Victoria Bitter')
+ finally:
+ con.close()
+
+ def test_description(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ self.assertEqual(cur.description,None,
+ 'cursor.description should be none after executing a '
+ 'statement that can return no rows (such as DDL)'
+ )
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(len(cur.description),1,
+ 'cursor.description describes too many columns'
+ )
+ self.assertEqual(len(cur.description[0]),7,
+ 'cursor.description[x] tuples must have 7 elements'
+ )
+ self.assertEqual(cur.description[0][0].lower(),'name',
+ 'cursor.description[x][0] must return column name'
+ )
+ self.assertEqual(cur.description[0][1],self.driver.STRING,
+ 'cursor.description[x][1] must return column type. Got %r'
+ % cur.description[0][1]
+ )
+
+ # Make sure self.description gets reset
+ self.executeDDL2(cur)
+ self.assertEqual(cur.description,None,
+ 'cursor.description not being set to None when executing '
+ 'no-result statements (eg. DDL)'
+ )
+ finally:
+ con.close()
+
+ def test_rowcount(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ _failUnless(self,cur.rowcount in (-1,0), # Bug #543885
+ 'cursor.rowcount should be -1 or 0 after executing no-result '
+ 'statements'
+ )
+ cur.execute("%s into %sbooze values ('Victoria Bitter')" % (
+ self.insert, self.table_prefix
+ ))
+ _failUnless(self,cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number or rows inserted, or '
+ 'set to -1 after executing an insert statement'
+ )
+ cur.execute("select name from %sbooze" % self.table_prefix)
+ _failUnless(self,cur.rowcount in (-1,1),
+ 'cursor.rowcount should == number of rows returned, or '
+ 'set to -1 after executing a select statement'
+ )
+ self.executeDDL2(cur)
+ _failUnless(self,cur.rowcount in (-1,0), # Bug #543885
+ 'cursor.rowcount should be -1 or 0 after executing no-result '
+ 'statements'
+ )
+ finally:
+ con.close()
+
+ lower_func = 'lower'
+ def test_callproc(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if self.lower_func and hasattr(cur,'callproc'):
+ r = cur.callproc(self.lower_func,('FOO',))
+ self.assertEqual(len(r),1)
+ self.assertEqual(r[0],'FOO')
+ r = cur.fetchall()
+ self.assertEqual(len(r),1,'callproc produced no result set')
+ self.assertEqual(len(r[0]),1,
+ 'callproc produced invalid result set'
+ )
+ self.assertEqual(r[0][0],'foo',
+ 'callproc produced invalid results'
+ )
+ finally:
+ con.close()
+
+ def test_close(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ finally:
+ con.close()
+
+ # cursor.execute should raise an Error if called after connection
+ # closed
+ self.assertRaises(self.driver.Error,self.executeDDL1,cur)
+
+ # connection.commit should raise an Error if called after connection'
+ # closed.'
+ self.assertRaises(self.driver.Error,con.commit)
+
+ def test_non_idempotent_close(self):
+ con = self._connect()
+ con.close()
+ # connection.close should raise an Error if called more than once
+ #!!! reasonable persons differ about the usefulness of this test and this feature !!!
+ self.assertRaises(self.driver.Error,con.close)
+
+ def test_execute(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self._paraminsert(cur)
+ finally:
+ con.close()
+
+ def _paraminsert(self,cur):
+ self.executeDDL2(cur)
+ cur.execute("%s into %sbarflys values ('Victoria Bitter', 'thi%%s :may ca%%(u)se? troub:1e')" % (
+ self.insert, self.table_prefix
+ ))
+ _failUnless(self,cur.rowcount in (-1,1))
+
+ if self.driver.paramstyle == 'qmark':
+ cur.execute(
+ "%s into %sbarflys values (?, 'thi%%s :may ca%%(u)se? troub:1e')" % (self.insert, self.table_prefix),
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'numeric':
+ cur.execute(
+ "%s into %sbarflys values (:1, 'thi%%s :may ca%%(u)se? troub:1e')" % (self.insert, self.table_prefix),
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'named':
+ cur.execute(
+ "%s into %sbarflys values (:beer, 'thi%%s :may ca%%(u)se? troub:1e')" % (self.insert, self.table_prefix),
+ {'beer':"Cooper's"}
+ )
+ elif self.driver.paramstyle == 'format':
+ cur.execute(
+ "%s into %sbarflys values (%%s, 'thi%%%%s :may ca%%%%(u)se? troub:1e')" % (self.insert, self.table_prefix),
+ ("Cooper's",)
+ )
+ elif self.driver.paramstyle == 'pyformat':
+ cur.execute(
+ "%s into %sbarflys values (%%(beer)s, 'thi%%%%s :may ca%%%%(u)se? troub:1e')" % (self.insert, self.table_prefix),
+ {'beer':"Cooper's"}
+ )
+ else:
+ self.fail('Invalid paramstyle')
+ _failUnless(self,cur.rowcount in (-1,1))
+
+ cur.execute('select name, drink from %sbarflys' % self.table_prefix)
+ res = cur.fetchall()
+ self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
+ beers = [res[0][0],res[1][0]]
+ beers.sort()
+ self.assertEqual(beers[0],"Cooper's",
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly'
+ )
+ self.assertEqual(beers[1],"Victoria Bitter",
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly'
+ )
+ trouble = "thi%s :may ca%(u)se? troub:1e"
+ self.assertEqual(res[0][1], trouble,
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly. Got=%s, Expected=%s' % (repr(res[0][1]), repr(trouble)))
+ self.assertEqual(res[1][1], trouble,
+ 'cursor.fetchall retrieved incorrect data, or data inserted '
+ 'incorrectly. Got=%s, Expected=%s' % (repr(res[1][1]), repr(trouble)
+ ))
+
+ def test_executemany(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ largs = [ ("Cooper's",) , ("Boag's",) ]
+ margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
+ if self.driver.paramstyle == 'qmark':
+ cur.executemany(
+ '%s into %sbooze values (?)' % (self.insert, self.table_prefix),
+ largs
+ )
+ elif self.driver.paramstyle == 'numeric':
+ cur.executemany(
+ '%s into %sbooze values (:1)' % (self.insert, self.table_prefix),
+ largs
+ )
+ elif self.driver.paramstyle == 'named':
+ cur.executemany(
+ '%s into %sbooze values (:beer)' % (self.insert, self.table_prefix),
+ margs
+ )
+ elif self.driver.paramstyle == 'format':
+ cur.executemany(
+ '%s into %sbooze values (%%s)' % (self.insert, self.table_prefix),
+ largs
+ )
+ elif self.driver.paramstyle == 'pyformat':
+ cur.executemany(
+ '%s into %sbooze values (%%(beer)s)' % (
+ self.insert, self.table_prefix
+ ),
+ margs
+ )
+ else:
+ self.fail('Unknown paramstyle')
+ _failUnless(self,cur.rowcount in (-1,2),
+ 'insert using cursor.executemany set cursor.rowcount to '
+ 'incorrect value %r' % cur.rowcount
+ )
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ res = cur.fetchall()
+ self.assertEqual(len(res),2,
+ 'cursor.fetchall retrieved incorrect number of rows'
+ )
+ beers = [res[0][0],res[1][0]]
+ beers.sort()
+ self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
+ self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
+ finally:
+ con.close()
+
+ def test_fetchone(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchone should raise an Error if called before
+ # executing a select-type query
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ self.executeDDL1(cur)
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if a query retrieves '
+ 'no rows'
+ )
+ _failUnless(self,cur.rowcount in (-1,0))
+
+ # cursor.fetchone should raise an Error if called after
+ # executing a query that cannnot return rows
+ cur.execute("%s into %sbooze values ('Victoria Bitter')" % (
+ self.insert, self.table_prefix
+ ))
+ self.assertRaises(self.driver.Error,cur.fetchone)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchone()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchone should have retrieved a single row'
+ )
+ self.assertEqual(r[0],'Victoria Bitter',
+ 'cursor.fetchone retrieved incorrect data'
+ )
+ self.assertEqual(cur.fetchone(),None,
+ 'cursor.fetchone should return None if no more rows available'
+ )
+ _failUnless(self,cur.rowcount in (-1,1))
+ finally:
+ con.close()
+
+ samples = [
+ 'Carlton Cold',
+ 'Carlton Draft',
+ 'Mountain Goat',
+ 'Redback',
+ 'Victoria Bitter',
+ 'XXXX'
+ ]
+
+ def _populate(self):
+ ''' Return a list of sql commands to setup the DB for the fetch
+ tests.
+ '''
+ populate = [
+ "%s into %sbooze values ('%s')" % (self.insert, self.table_prefix, s)
+ for s in self.samples
+ ]
+ return populate
+
+ def test_fetchmany(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+
+ # cursor.fetchmany should raise an Error if called without
+ #issuing a query
+ self.assertRaises(self.driver.Error,cur.fetchmany,4)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchmany()
+ self.assertEqual(len(r),1,
+ 'cursor.fetchmany retrieved incorrect number of rows, '
+ 'default of arraysize is one.'
+ )
+ cur.arraysize=10
+ r = cur.fetchmany(3) # Should get 3 rows
+ self.assertEqual(len(r),3,
+ 'cursor.fetchmany retrieved incorrect number of rows'
+ )
+ r = cur.fetchmany(4) # Should get 2 more
+ self.assertEqual(len(r),2,
+ 'cursor.fetchmany retrieved incorrect number of rows'
+ )
+ r = cur.fetchmany(4) # Should be an empty sequence
+ self.assertEqual(len(r),0,
+ 'cursor.fetchmany should return an empty sequence after '
+ 'results are exhausted'
+ )
+ _failUnless(self,cur.rowcount in (-1,6))
+
+ # Same as above, using cursor.arraysize
+ cur.arraysize=4
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.fetchmany() # Should get 4 rows
+ self.assertEqual(len(r),4,
+ 'cursor.arraysize not being honoured by fetchmany'
+ )
+ r = cur.fetchmany() # Should get 2 more
+ self.assertEqual(len(r),2)
+ r = cur.fetchmany() # Should be an empty sequence
+ self.assertEqual(len(r),0)
+ _failUnless(self,cur.rowcount in (-1,6))
+
+ cur.arraysize=6
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchmany() # Should get all rows
+ _failUnless(self,cur.rowcount in (-1,6))
+ self.assertEqual(len(rows),6)
+ self.assertEqual(len(rows),6)
+ rows = [r[0] for r in rows]
+ rows.sort()
+
+ # Make sure we get the right data back out
+ for i in range(0,6):
+ self.assertEqual(rows[i],self.samples[i],
+ 'incorrect data retrieved by cursor.fetchmany'
+ )
+
+ rows = cur.fetchmany() # Should return an empty list
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchmany should return an empty sequence if '
+ 'called after the whole result set has been fetched'
+ )
+ _failUnless(self,cur.rowcount in (-1,6))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ r = cur.fetchmany() # Should get empty sequence
+ self.assertEqual(len(r),0,
+ 'cursor.fetchmany should return an empty sequence if '
+ 'query retrieved no rows'
+ )
+ _failUnless(self,cur.rowcount in (-1,0))
+
+ finally:
+ con.close()
+
+ def test_fetchall(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ # cursor.fetchall should raise an Error if called
+ # without executing a query that may return rows (such
+ # as a select)
+ self.assertRaises(self.driver.Error, cur.fetchall)
+
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ # cursor.fetchall should raise an Error if called
+ # after executing a a statement that cannot return rows
+ self.assertRaises(self.driver.Error,cur.fetchall)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows = cur.fetchall()
+ _failUnless(self,cur.rowcount in (-1,len(self.samples)))
+ self.assertEqual(len(rows),len(self.samples),
+ 'cursor.fetchall did not retrieve all rows'
+ )
+ rows = [r[0] for r in rows]
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'cursor.fetchall retrieved incorrect rows'
+ )
+ rows = cur.fetchall()
+ self.assertEqual(
+ len(rows),0,
+ 'cursor.fetchall should return an empty list if called '
+ 'after the whole result set has been fetched'
+ )
+ _failUnless(self,cur.rowcount in (-1,len(self.samples)))
+
+ self.executeDDL2(cur)
+ cur.execute('select name from %sbarflys' % self.table_prefix)
+ rows = cur.fetchall()
+ _failUnless(self,cur.rowcount in (-1,0))
+ self.assertEqual(len(rows),0,
+ 'cursor.fetchall should return an empty list if '
+ 'a select query returns no rows'
+ )
+
+ finally:
+ con.close()
+
+ def test_mixedfetch(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ for sql in self._populate():
+ cur.execute(sql)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ rows1 = cur.fetchone()
+ rows23 = cur.fetchmany(2)
+ rows4 = cur.fetchone()
+ rows56 = cur.fetchall()
+ _failUnless(self,cur.rowcount in (-1,6))
+ self.assertEqual(len(rows23),2,
+ 'fetchmany returned incorrect number of rows'
+ )
+ self.assertEqual(len(rows56),2,
+ 'fetchall returned incorrect number of rows'
+ )
+
+ rows = [rows1[0]]
+ rows.extend([rows23[0][0],rows23[1][0]])
+ rows.append(rows4[0])
+ rows.extend([rows56[0][0],rows56[1][0]])
+ rows.sort()
+ for i in range(0,len(self.samples)):
+ self.assertEqual(rows[i],self.samples[i],
+ 'incorrect data retrieved or inserted'
+ )
+ finally:
+ con.close()
+
+ def help_nextset_setUp(self,cur):
+ ''' Should create a procedure called deleteme
+ that returns two result sets, first the
+ number of rows in booze then "name from booze"
+ '''
+ raise NotImplementedError('Helper not implemented')
+ #sql="""
+ # create procedure deleteme as
+ # begin
+ # select count(*) from booze
+ # select name from booze
+ # end
+ #"""
+ #cur.execute(sql)
+
+ def help_nextset_tearDown(self,cur):
+ 'If cleaning up is needed after nextSetTest'
+ raise NotImplementedError('Helper not implemented')
+ #cur.execute("drop procedure deleteme")
+
+ def test_nextset(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if not hasattr(cur,'nextset'):
+ return
+
+ try:
+ self.executeDDL1(cur)
+ sql=self._populate()
+ for sql in self._populate():
+ cur.execute(sql)
+
+ self.help_nextset_setUp(cur)
+
+ cur.callproc('deleteme')
+ numberofrows=cur.fetchone()
+ assert numberofrows[0]== len(self.samples)
+ assert cur.nextset()
+ names=cur.fetchall()
+ assert len(names) == len(self.samples)
+ s=cur.nextset()
+ assert s == None,'No more return sets, should return None'
+ finally:
+ self.help_nextset_tearDown(cur)
+
+ finally:
+ con.close()
+
+ def test_nextset(self):
+ raise NotImplementedError('Drivers need to override this test')
+
+ def test_arraysize(self):
+ # Not much here - rest of the tests for this are in test_fetchmany
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ _failUnless(self,hasattr(cur,'arraysize'),
+ 'cursor.arraysize must be defined'
+ )
+ finally:
+ con.close()
+
+ def test_setinputsizes(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ cur.setinputsizes( (25,) )
+ self._paraminsert(cur) # Make sure cursor still works
+ finally:
+ con.close()
+
+ def test_setoutputsize_basic(self):
+ # Basic test is to make sure setoutputsize doesn't blow up
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ cur.setoutputsize(1000)
+ cur.setoutputsize(2000,0)
+ self._paraminsert(cur) # Make sure the cursor still works
+ finally:
+ con.close()
+
+ def test_setoutputsize(self):
+ # Real test for setoutputsize is driver dependant
+ raise NotImplementedError('Driver needed to override this test')
+
+ def test_None(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL1(cur)
+ cur.execute("%s into %sbarflys values ('a', NULL)" % (self.insert, self.table_prefix))
+ cur.execute('select drink from %sbarflys' % self.table_prefix)
+ r = cur.fetchall()
+ self.assertEqual(len(r),1)
+ self.assertEqual(len(r[0]),1)
+ self.assertEqual(r[0][0],None,'NULL value not returned as None')
+ finally:
+ con.close()
+
+ def test_Date(self):
+ d1 = self.driver.Date(2002,12,25)
+ d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(d1),str(d2))
+
+ def test_Time(self):
+ t1 = self.driver.Time(13,45,30)
+ t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(t1),str(t2))
+
+ def test_Timestamp(self):
+ t1 = self.driver.Timestamp(2002,12,25,13,45,30)
+ t2 = self.driver.TimestampFromTicks(
+ time.mktime((2002,12,25,13,45,30,0,0,0))
+ )
+ # Can we assume this? API doesn't specify, but it seems implied
+ # self.assertEqual(str(t1),str(t2))
+
+ def test_Binary(self):
+ b = self.driver.Binary(str2bytes('Something'))
+ b = self.driver.Binary(str2bytes(''))
+
+ def test_STRING(self):
+ _failUnless(self, hasattr(self.driver,'STRING'),
+ 'module.STRING must be defined'
+ )
+
+ def test_BINARY(self):
+ _failUnless(self, hasattr(self.driver,'BINARY'),
+ 'module.BINARY must be defined.'
+ )
+
+ def test_NUMBER(self):
+ _failUnless(self, hasattr(self.driver,'NUMBER'),
+ 'module.NUMBER must be defined.'
+ )
+
+ def test_DATETIME(self):
+ _failUnless(self, hasattr(self.driver,'DATETIME'),
+ 'module.DATETIME must be defined.'
+ )
+
+ def test_ROWID(self):
+ _failUnless(self, hasattr(self.driver,'ROWID'),
+ 'module.ROWID must be defined.'
+ )
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/test_avatica.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/test_avatica.py b/python/phoenixdb/tests/test_avatica.py
new file mode 100644
index 0000000..6152814
--- /dev/null
+++ b/python/phoenixdb/tests/test_avatica.py
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from phoenixdb.avatica.client import parse_url, urlparse
+
+
+class ParseUrlTest(unittest.TestCase):
+
+ def test_parse_url(self):
+ self.assertEqual(urlparse.urlparse('http://localhost:8765/'), parse_url('localhost'))
+ self.assertEqual(urlparse.urlparse('http://localhost:2222/'), parse_url('localhost:2222'))
+ self.assertEqual(urlparse.urlparse('http://localhost:2222/'), parse_url('http://localhost:2222/'))
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/test_connection.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/test_connection.py b/python/phoenixdb/tests/test_connection.py
new file mode 100644
index 0000000..2deacf5
--- /dev/null
+++ b/python/phoenixdb/tests/test_connection.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import phoenixdb
+from phoenixdb.tests import TEST_DB_URL
+
+
+@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
+class PhoenixConnectionTest(unittest.TestCase):
+
+ def _connect(self, connect_kw_args):
+ try:
+ r = phoenixdb.connect(TEST_DB_URL, **connect_kw_args)
+ except AttributeError:
+ self.fail("Failed to connect")
+ return r
+
+ def test_connection_credentials(self):
+ connect_kw_args = {'user': 'SCOTT', 'password': 'TIGER', 'readonly': 'True'}
+ con = self._connect(connect_kw_args)
+ try:
+ self.assertEqual(
+ con._connection_args, {'user': 'SCOTT', 'password': 'TIGER'},
+ 'Should have extract user and password')
+ self.assertEqual(
+ con._filtered_args, {'readonly': 'True'},
+ 'Should have not extracted foo')
+ finally:
+ con.close()
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/test_db.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/test_db.py b/python/phoenixdb/tests/test_db.py
new file mode 100644
index 0000000..2fb1a2a
--- /dev/null
+++ b/python/phoenixdb/tests/test_db.py
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import phoenixdb
+import phoenixdb.cursor
+from phoenixdb.errors import InternalError
+from phoenixdb.tests import TEST_DB_URL
+
+
+@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
+class PhoenixDatabaseTest(unittest.TestCase):
+
+ def test_select_literal(self):
+ db = phoenixdb.connect(TEST_DB_URL, autocommit=True)
+ self.addCleanup(db.close)
+
+ with db.cursor() as cursor:
+ cursor.execute("DROP TABLE IF EXISTS test")
+ cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text VARCHAR)")
+ cursor.executemany("UPSERT INTO test VALUES (?, ?)", [[i, 'text {}'.format(i)] for i in range(10)])
+
+ with db.cursor() as cursor:
+ cursor.itersize = 4
+ cursor.execute("SELECT * FROM test WHERE id>1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[i, 'text {}'.format(i)] for i in range(2, 10)])
+
+ def test_select_parameter(self):
+ db = phoenixdb.connect(TEST_DB_URL, autocommit=True)
+ self.addCleanup(db.close)
+
+ with db.cursor() as cursor:
+ cursor.execute("DROP TABLE IF EXISTS test")
+ cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text VARCHAR)")
+ cursor.executemany("UPSERT INTO test VALUES (?, ?)", [[i, 'text {}'.format(i)] for i in range(10)])
+
+ with db.cursor() as cursor:
+ cursor.itersize = 4
+ cursor.execute("SELECT * FROM test WHERE id>? ORDER BY id", [1])
+ self.assertEqual(cursor.fetchall(), [[i, 'text {}'.format(i)] for i in range(2, 10)])
+
+ def _check_dict_cursor(self, cursor):
+ cursor.execute("DROP TABLE IF EXISTS test")
+ cursor.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text VARCHAR)")
+ cursor.execute("UPSERT INTO test VALUES (?, ?)", [1, 'text 1'])
+ cursor.execute("SELECT * FROM test ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [{'ID': 1, 'TEXT': 'text 1'}])
+
+ def test_dict_cursor_default_parameter(self):
+ db = phoenixdb.connect(TEST_DB_URL, autocommit=True, cursor_factory=phoenixdb.cursor.DictCursor)
+ self.addCleanup(db.close)
+
+ with db.cursor() as cursor:
+ self._check_dict_cursor(cursor)
+
+ def test_dict_cursor_default_attribute(self):
+ db = phoenixdb.connect(TEST_DB_URL, autocommit=True)
+ db.cursor_factory = phoenixdb.cursor.DictCursor
+ self.addCleanup(db.close)
+
+ with db.cursor() as cursor:
+ self._check_dict_cursor(cursor)
+
+ def test_dict_cursor(self):
+ db = phoenixdb.connect(TEST_DB_URL, autocommit=True)
+ self.addCleanup(db.close)
+
+ with db.cursor(cursor_factory=phoenixdb.cursor.DictCursor) as cursor:
+ self._check_dict_cursor(cursor)
+
+ def test_schema(self):
+ db = phoenixdb.connect(TEST_DB_URL, autocommit=True)
+ self.addCleanup(db.close)
+
+ with db.cursor() as cursor:
+ try:
+ cursor.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
+ except InternalError as e:
+ if "phoenix.schema.isNamespaceMappingEnabled" in e.message:
+ self.skipTest(e.message)
+ raise
+
+ cursor.execute("DROP TABLE IF EXISTS test_schema.test")
+ cursor.execute("CREATE TABLE test_schema.test (id INTEGER PRIMARY KEY, text VARCHAR)")
+ cursor.execute("UPSERT INTO test_schema.test VALUES (?, ?)", [1, 'text 1'])
+ cursor.execute("SELECT * FROM test_schema.test ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[1, 'text 1']])
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/test_dbapi20.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/test_dbapi20.py b/python/phoenixdb/tests/test_dbapi20.py
new file mode 100644
index 0000000..0e5c2e4
--- /dev/null
+++ b/python/phoenixdb/tests/test_dbapi20.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import phoenixdb
+from . import dbapi20
+from phoenixdb.tests import TEST_DB_URL
+
+
+@unittest.skipIf(TEST_DB_URL is None, "these tests require the PHOENIXDB_TEST_DB_URL environment variable set to a clean database")
+class PhoenixDatabaseAPI20Test(dbapi20.DatabaseAPI20Test):
+ driver = phoenixdb
+ connect_args = (TEST_DB_URL, )
+
+ ddl1 = 'create table %sbooze (name varchar(20) primary key)' % dbapi20.DatabaseAPI20Test.table_prefix
+ ddl2 = 'create table %sbarflys (name varchar(20) primary key, drink varchar(30))' % dbapi20.DatabaseAPI20Test.table_prefix
+ insert = 'upsert'
+
+ def test_nextset(self):
+ pass
+
+ def test_setoutputsize(self):
+ pass
+
+ def _connect(self):
+ con = dbapi20.DatabaseAPI20Test._connect(self)
+ con.autocommit = True
+ return con
+
+ def test_None(self):
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ self.executeDDL2(cur)
+ cur.execute("%s into %sbarflys values ('a', NULL)" % (self.insert, self.table_prefix))
+ cur.execute('select drink from %sbarflys' % self.table_prefix)
+ r = cur.fetchall()
+ self.assertEqual(len(r), 1)
+ self.assertEqual(len(r[0]), 1)
+ self.assertEqual(r[0][0], None, 'NULL value not returned as None')
+ finally:
+ con.close()
+
+ def test_autocommit(self):
+ con = dbapi20.DatabaseAPI20Test._connect(self)
+ self.assertFalse(con.autocommit)
+ con.autocommit = True
+ self.assertTrue(con.autocommit)
+ con.autocommit = False
+ self.assertFalse(con.autocommit)
+ con.close()
+
+ def test_readonly(self):
+ con = dbapi20.DatabaseAPI20Test._connect(self)
+ self.assertFalse(con.readonly)
+ con.readonly = True
+ self.assertTrue(con.readonly)
+ con.readonly = False
+ self.assertFalse(con.readonly)
+ con.close()
+
+ def test_iter(self):
+ # https://www.python.org/dev/peps/pep-0249/#iter
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if hasattr(cur, '__iter__'):
+ self.assertIs(cur, iter(cur))
+ finally:
+ con.close()
+
+ def test_next(self):
+ # https://www.python.org/dev/peps/pep-0249/#next
+ con = self._connect()
+ try:
+ cur = con.cursor()
+ if not hasattr(cur, 'next'):
+ return
+
+ # cursor.next should raise an Error if called before
+ # executing a select-type query
+ self.assertRaises(self.driver.Error, cur.next)
+
+ # cursor.next should raise an Error if called after
+ # executing a query that cannnot return rows
+ self.executeDDL1(cur)
+ self.assertRaises(self.driver.Error, cur.next)
+
+ # cursor.next should return None if a query retrieves '
+ # no rows
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ self.assertRaises(StopIteration, cur.next)
+ self.failUnless(cur.rowcount in (-1, 0))
+
+ # cursor.next should raise an Error if called after
+ # executing a query that cannnot return rows
+ cur.execute("%s into %sbooze values ('Victoria Bitter')" % (
+ self.insert, self.table_prefix
+ ))
+ self.assertRaises(self.driver.Error, cur.next)
+
+ cur.execute('select name from %sbooze' % self.table_prefix)
+ r = cur.next()
+ self.assertEqual(len(r), 1, 'cursor.next should have retrieved a row with one column')
+ self.assertEqual(r[0], 'Victoria Bitter', 'cursor.next retrieved incorrect data')
+ # cursor.next should raise StopIteration if no more rows available
+ self.assertRaises(StopIteration, cur.next)
+ self.failUnless(cur.rowcount in (-1, 1))
+ finally:
+ con.close()
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/test_errors.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/test_errors.py b/python/phoenixdb/tests/test_errors.py
new file mode 100644
index 0000000..191ccb1
--- /dev/null
+++ b/python/phoenixdb/tests/test_errors.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from phoenixdb.tests import DatabaseTestCase
+
+
+class ProgrammingErrorTest(DatabaseTestCase):
+
+ def test_invalid_sql(self):
+ with self.conn.cursor() as cursor:
+ with self.assertRaises(self.conn.ProgrammingError) as cm:
+ cursor.execute("UPS")
+ self.assertEqual("Syntax error. Encountered \"UPS\" at line 1, column 1.", cm.exception.message)
+ self.assertEqual(601, cm.exception.code)
+ self.assertEqual("42P00", cm.exception.sqlstate)
+
+
+class IntegrityErrorTest(DatabaseTestCase):
+
+ def test_null_in_pk(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key")
+ with self.conn.cursor() as cursor:
+ with self.assertRaises(self.conn.IntegrityError) as cm:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (NULL)")
+ self.assertEqual("Constraint violation. PHOENIXDB_TEST_TBL1.ID may not be null", cm.exception.message)
+ self.assertEqual(218, cm.exception.code)
+ self.assertIn(cm.exception.sqlstate, ("22018", "23018"))
+
+
+class DataErrorTest(DatabaseTestCase):
+
+ def test_number_outside_of_range(self):
+ self.createTable("phoenixdb_test_tbl1", "id tinyint primary key")
+ with self.conn.cursor() as cursor:
+ with self.assertRaises(self.conn.DataError) as cm:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (10000)")
+ self.assertEqual("Type mismatch. TINYINT and INTEGER for 10000", cm.exception.message)
+ self.assertEqual(203, cm.exception.code)
+ self.assertEqual("22005", cm.exception.sqlstate)
+
+ def test_division_by_zero(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key")
+ with self.conn.cursor() as cursor:
+ with self.assertRaises(self.conn.DataError) as cm:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2/0)")
+ self.assertEqual("Divide by zero.", cm.exception.message)
+ self.assertEqual(202, cm.exception.code)
+ self.assertEqual("22012", cm.exception.sqlstate)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ba6abf7e/python/phoenixdb/tests/test_types.py
----------------------------------------------------------------------
diff --git a/python/phoenixdb/tests/test_types.py b/python/phoenixdb/tests/test_types.py
new file mode 100644
index 0000000..2cef0f2
--- /dev/null
+++ b/python/phoenixdb/tests/test_types.py
@@ -0,0 +1,327 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import unittest
+import datetime
+import phoenixdb
+from decimal import Decimal
+from phoenixdb.tests import DatabaseTestCase
+
+
+class TypesTest(DatabaseTestCase):
+
+ def checkIntType(self, type_name, min_value, max_value):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val {}".format(type_name))
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 1)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [1])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [min_value])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [max_value])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER)
+ self.assertEqual(cursor.fetchall(), [[1, 1], [2, None], [3, 1], [4, None], [5, min_value], [6, max_value]])
+
+ self.assertRaises(
+ self.conn.DatabaseError, cursor.execute,
+ "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, {})".format(min_value - 1))
+
+ self.assertRaises(
+ self.conn.DatabaseError, cursor.execute,
+ "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, {})".format(max_value + 1))
+
+ # XXX The server silently truncates the values
+# self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [min_value - 1])
+# self.assertRaises(self.conn.DatabaseError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [max_value + 1])
+
+ def test_integer(self):
+ self.checkIntType("integer", -2147483648, 2147483647)
+
+ def test_unsigned_int(self):
+ self.checkIntType("unsigned_int", 0, 2147483647)
+
+ def test_bigint(self):
+ self.checkIntType("bigint", -9223372036854775808, 9223372036854775807)
+
+ def test_unsigned_long(self):
+ self.checkIntType("unsigned_long", 0, 9223372036854775807)
+
+ def test_tinyint(self):
+ self.checkIntType("tinyint", -128, 127)
+
+ def test_unsigned_tinyint(self):
+ self.checkIntType("unsigned_tinyint", 0, 127)
+
+ def test_smallint(self):
+ self.checkIntType("smallint", -32768, 32767)
+
+ def test_unsigned_smallint(self):
+ self.checkIntType("unsigned_smallint", 0, 32767)
+
+ def checkFloatType(self, type_name, min_value, max_value):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val {}".format(type_name))
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 1)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [1])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [min_value])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [max_value])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER)
+ rows = cursor.fetchall()
+ self.assertEqual([r[0] for r in rows], [1, 2, 3, 4, 5, 6])
+ self.assertEqual(rows[0][1], 1.0)
+ self.assertEqual(rows[1][1], None)
+ self.assertEqual(rows[2][1], 1.0)
+ self.assertEqual(rows[3][1], None)
+ self.assertAlmostEqual(rows[4][1], min_value)
+ self.assertAlmostEqual(rows[5][1], max_value)
+
+ def test_float(self):
+ self.checkFloatType("float", -3.4028234663852886e+38, 3.4028234663852886e+38)
+
+ def test_unsigned_float(self):
+ self.checkFloatType("unsigned_float", 0, 3.4028234663852886e+38)
+
+ def test_double(self):
+ self.checkFloatType("double", -1.7976931348623158E+308, 1.7976931348623158E+308)
+
+ def test_unsigned_double(self):
+ self.checkFloatType("unsigned_double", 0, 1.7976931348623158E+308)
+
+ def test_decimal(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val decimal(8,3)")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 33333.333)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [33333.333])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [Decimal('33333.333')])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [None])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.description[1].type_code, phoenixdb.NUMBER)
+ rows = cursor.fetchall()
+ self.assertEqual([r[0] for r in rows], [1, 2, 3, 4, 5])
+ self.assertEqual(rows[0][1], Decimal('33333.333'))
+ self.assertEqual(rows[1][1], None)
+ self.assertEqual(rows[2][1], Decimal('33333.333'))
+ self.assertEqual(rows[3][1], Decimal('33333.333'))
+ self.assertEqual(rows[4][1], None)
+ self.assertRaises(
+ self.conn.DatabaseError, cursor.execute,
+ "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, ?)", [Decimal('1234567890')])
+ self.assertRaises(
+ self.conn.DatabaseError, cursor.execute,
+ "UPSERT INTO phoenixdb_test_tbl1 VALUES (101, ?)", [Decimal('123456.789')])
+
+ def test_boolean(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val boolean")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, TRUE)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, FALSE)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [True])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [False])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [None])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.description[1].type_code, phoenixdb.BOOLEAN)
+ self.assertEqual(cursor.fetchall(), [[1, True], [2, False], [3, None], [4, True], [5, False], [6, None]])
+
+ def test_time(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val time")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '1970-01-01 12:01:02')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [phoenixdb.Time(12, 1, 2)])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [datetime.time(12, 1, 2)])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [None])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, datetime.time(12, 1, 2)],
+ [2, None],
+ [3, datetime.time(12, 1, 2)],
+ [4, datetime.time(12, 1, 2)],
+ [5, None],
+ ])
+
+ @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-797")
+ def test_time_full(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val time")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)],
+ [2, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)],
+ ])
+
+ def test_date(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val date")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 00:00:00')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [phoenixdb.Date(2015, 7, 12)])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [datetime.date(2015, 7, 12)])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, datetime.date(2015, 7, 12)],
+ [3, datetime.date(2015, 7, 12)],
+ [4, datetime.date(2015, 7, 12)],
+ ])
+
+ @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-798")
+ def test_date_full(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val date")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)],
+ [2, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)],
+ ])
+
+ def test_date_null(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val date")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [None])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id") # raises NullPointerException on the server
+ self.assertEqual(cursor.fetchall(), [
+ [1, None],
+ [2, None],
+ ])
+
+ def test_timestamp(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val timestamp")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", [phoenixdb.Timestamp(2015, 7, 12, 13, 1, 2)])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, ?)", [None])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)],
+ [2, None],
+ [3, datetime.datetime(2015, 7, 12, 13, 1, 2)],
+ [4, datetime.datetime(2015, 7, 12, 13, 1, 2, 123000)],
+ [5, None],
+ ])
+
+ @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-796")
+ def test_timestamp_full(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val timestamp")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, '2015-07-12 13:01:02.123456789')")
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, datetime.datetime(2015, 7, 12, 13, 1, 2, 123456789)],
+ ])
+
+ def test_varchar(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'abc')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", ['abc'])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, '')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [''])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[1, 'abc'], [2, None], [3, 'abc'], [4, None], [5, None], [6, None]])
+
+ def test_varchar_very_long(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar")
+ with self.conn.cursor() as cursor:
+ value = '1234567890' * 1000
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ?)", [value])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[1, value]])
+
+ def test_varchar_limited(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val varchar(2)")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, ?)", ['ab'])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, '')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [''])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[1, 'ab'], [2, None], [3, 'ab'], [4, None], [5, None], [6, None]])
+ self.assertRaises(self.conn.DataError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, 'abc')")
+
+ def test_char_null(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val char(2)")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, NULL)")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [None])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (5, '')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (6, ?)", [''])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[2, None], [4, None], [5, None], [6, None]])
+ self.assertRaises(self.conn.DataError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, 'abc')")
+
+ def test_char(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val char(2)")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", ['ab'])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, 'a')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", ['b'])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[1, 'ab'], [2, 'ab'], [3, 'a'], [4, 'b']])
+ self.assertRaises(self.conn.DataError, cursor.execute, "UPSERT INTO phoenixdb_test_tbl1 VALUES (100, 'abc')")
+
+ def test_binary(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(2)")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, 'ab')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [phoenixdb.Binary(b'ab')])
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (3, '\x01\x00')")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (4, ?)", [phoenixdb.Binary(b'\x01\x00')])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, b'ab'],
+ [2, b'ab'],
+ [3, b'\x01\x00'],
+ [4, b'\x01\x00'],
+ ])
+
+ def test_binary_all_bytes(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val binary(256)")
+ with self.conn.cursor() as cursor:
+ if sys.version_info[0] < 3:
+ value = ''.join(map(chr, range(256)))
+ else:
+ value = bytes(range(256))
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ?)", [phoenixdb.Binary(value)])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [[1, value]])
+
+ @unittest.skip("https://issues.apache.org/jira/browse/CALCITE-1050 https://issues.apache.org/jira/browse/PHOENIX-2585")
+ def test_array(self):
+ self.createTable("phoenixdb_test_tbl1", "id integer primary key, val integer[]")
+ with self.conn.cursor() as cursor:
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (1, ARRAY[1, 2])")
+ cursor.execute("UPSERT INTO phoenixdb_test_tbl1 VALUES (2, ?)", [[2, 3]])
+ cursor.execute("SELECT id, val FROM phoenixdb_test_tbl1 ORDER BY id")
+ self.assertEqual(cursor.fetchall(), [
+ [1, [1, 2]],
+ [2, [2, 3]],
+ ])