You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ed...@apache.org on 2012/06/22 19:58:04 UTC
[5/7] remove tools/testClient as they are moved to
tools/marvin/marvin already
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/converters.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/converters.py b/tools/testClient/pymysql/converters.py
deleted file mode 100644
index c4376dc..0000000
--- a/tools/testClient/pymysql/converters.py
+++ /dev/null
@@ -1,360 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-import re
-import datetime
-import time
-import sys
-
-from constants import FIELD_TYPE, FLAG
-from charset import charset_by_id
-
-PYTHON3 = sys.version_info[0] > 2
-
-try:
- set
-except NameError:
- try:
- from sets import BaseSet as set
- except ImportError:
- from sets import Set as set
-
-ESCAPE_REGEX = re.compile(r"[\0\n\r\032\'\"\\]")
-ESCAPE_MAP = {'\0': '\\0', '\n': '\\n', '\r': '\\r', '\032': '\\Z',
- '\'': '\\\'', '"': '\\"', '\\': '\\\\'}
-
-def escape_item(val, charset):
- if type(val) in [tuple, list, set]:
- return escape_sequence(val, charset)
- if type(val) is dict:
- return escape_dict(val, charset)
- if PYTHON3 and hasattr(val, "decode") and not isinstance(val, unicode):
- # deal with py3k bytes
- val = val.decode(charset)
- encoder = encoders[type(val)]
- val = encoder(val)
- if type(val) is str:
- return val
- val = val.encode(charset)
- return val
-
-def escape_dict(val, charset):
- n = {}
- for k, v in val.items():
- quoted = escape_item(v, charset)
- n[k] = quoted
- return n
-
-def escape_sequence(val, charset):
- n = []
- for item in val:
- quoted = escape_item(item, charset)
- n.append(quoted)
- return "(" + ",".join(n) + ")"
-
-def escape_set(val, charset):
- val = map(lambda x: escape_item(x, charset), val)
- return ','.join(val)
-
-def escape_bool(value):
- return str(int(value))
-
-def escape_object(value):
- return str(value)
-
-escape_int = escape_long = escape_object
-
-def escape_float(value):
- return ('%.15g' % value)
-
-def escape_string(value):
- return ("'%s'" % ESCAPE_REGEX.sub(
- lambda match: ESCAPE_MAP.get(match.group(0)), value))
-
-def escape_unicode(value):
- return escape_string(value)
-
-def escape_None(value):
- return 'NULL'
-
-def escape_timedelta(obj):
- seconds = int(obj.seconds) % 60
- minutes = int(obj.seconds // 60) % 60
- hours = int(obj.seconds // 3600) % 24 + int(obj.days) * 24
- return escape_string('%02d:%02d:%02d' % (hours, minutes, seconds))
-
-def escape_time(obj):
- s = "%02d:%02d:%02d" % (int(obj.hour), int(obj.minute),
- int(obj.second))
- if obj.microsecond:
- s += ".%f" % obj.microsecond
-
- return escape_string(s)
-
-def escape_datetime(obj):
- return escape_string(obj.strftime("%Y-%m-%d %H:%M:%S"))
-
-def escape_date(obj):
- return escape_string(obj.strftime("%Y-%m-%d"))
-
-def escape_struct_time(obj):
- return escape_datetime(datetime.datetime(*obj[:6]))
-
-def convert_datetime(connection, field, obj):
- """Returns a DATETIME or TIMESTAMP column value as a datetime object:
-
- >>> datetime_or_None('2007-02-25 23:06:20')
- datetime.datetime(2007, 2, 25, 23, 6, 20)
- >>> datetime_or_None('2007-02-25T23:06:20')
- datetime.datetime(2007, 2, 25, 23, 6, 20)
-
- Illegal values are returned as None:
-
- >>> datetime_or_None('2007-02-31T23:06:20') is None
- True
- >>> datetime_or_None('0000-00-00 00:00:00') is None
- True
-
- """
- if not isinstance(obj, unicode):
- obj = obj.decode(connection.charset)
- if ' ' in obj:
- sep = ' '
- elif 'T' in obj:
- sep = 'T'
- else:
- return convert_date(connection, field, obj)
-
- try:
- ymd, hms = obj.split(sep, 1)
- return datetime.datetime(*[ int(x) for x in ymd.split('-')+hms.split(':') ])
- except ValueError:
- return convert_date(connection, field, obj)
-
-def convert_timedelta(connection, field, obj):
- """Returns a TIME column as a timedelta object:
-
- >>> timedelta_or_None('25:06:17')
- datetime.timedelta(1, 3977)
- >>> timedelta_or_None('-25:06:17')
- datetime.timedelta(-2, 83177)
-
- Illegal values are returned as None:
-
- >>> timedelta_or_None('random crap') is None
- True
-
- Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
- can accept values as (+|-)DD HH:MM:SS. The latter format will not
- be parsed correctly by this function.
- """
- from math import modf
- try:
- if not isinstance(obj, unicode):
- obj = obj.decode(connection.charset)
- hours, minutes, seconds = tuple([int(x) for x in obj.split(':')])
- tdelta = datetime.timedelta(
- hours = int(hours),
- minutes = int(minutes),
- seconds = int(seconds),
- microseconds = int(modf(float(seconds))[0]*1000000),
- )
- return tdelta
- except ValueError:
- return None
-
-def convert_time(connection, field, obj):
- """Returns a TIME column as a time object:
-
- >>> time_or_None('15:06:17')
- datetime.time(15, 6, 17)
-
- Illegal values are returned as None:
-
- >>> time_or_None('-25:06:17') is None
- True
- >>> time_or_None('random crap') is None
- True
-
- Note that MySQL always returns TIME columns as (+|-)HH:MM:SS, but
- can accept values as (+|-)DD HH:MM:SS. The latter format will not
- be parsed correctly by this function.
-
- Also note that MySQL's TIME column corresponds more closely to
- Python's timedelta and not time. However if you want TIME columns
- to be treated as time-of-day and not a time offset, then you can
- use set this function as the converter for FIELD_TYPE.TIME.
- """
- from math import modf
- try:
- hour, minute, second = obj.split(':')
- return datetime.time(hour=int(hour), minute=int(minute),
- second=int(second),
- microsecond=int(modf(float(second))[0]*1000000))
- except ValueError:
- return None
-
-def convert_date(connection, field, obj):
- """Returns a DATE column as a date object:
-
- >>> date_or_None('2007-02-26')
- datetime.date(2007, 2, 26)
-
- Illegal values are returned as None:
-
- >>> date_or_None('2007-02-31') is None
- True
- >>> date_or_None('0000-00-00') is None
- True
-
- """
- try:
- if not isinstance(obj, unicode):
- obj = obj.decode(connection.charset)
- return datetime.date(*[ int(x) for x in obj.split('-', 2) ])
- except ValueError:
- return None
-
-def convert_mysql_timestamp(connection, field, timestamp):
- """Convert a MySQL TIMESTAMP to a Timestamp object.
-
- MySQL >= 4.1 returns TIMESTAMP in the same format as DATETIME:
-
- >>> mysql_timestamp_converter('2007-02-25 22:32:17')
- datetime.datetime(2007, 2, 25, 22, 32, 17)
-
- MySQL < 4.1 uses a big string of numbers:
-
- >>> mysql_timestamp_converter('20070225223217')
- datetime.datetime(2007, 2, 25, 22, 32, 17)
-
- Illegal values are returned as None:
-
- >>> mysql_timestamp_converter('2007-02-31 22:32:17') is None
- True
- >>> mysql_timestamp_converter('00000000000000') is None
- True
-
- """
- if not isinstance(timestamp, unicode):
- timestamp = timestamp.decode(connection.charset)
-
- if timestamp[4] == '-':
- return convert_datetime(connection, field, timestamp)
- timestamp += "0"*(14-len(timestamp)) # padding
- year, month, day, hour, minute, second = \
- int(timestamp[:4]), int(timestamp[4:6]), int(timestamp[6:8]), \
- int(timestamp[8:10]), int(timestamp[10:12]), int(timestamp[12:14])
- try:
- return datetime.datetime(year, month, day, hour, minute, second)
- except ValueError:
- return None
-
-def convert_set(s):
- return set(s.split(","))
-
-def convert_bit(connection, field, b):
- #b = "\x00" * (8 - len(b)) + b # pad w/ zeroes
- #return struct.unpack(">Q", b)[0]
- #
- # the snippet above is right, but MySQLdb doesn't process bits,
- # so we shouldn't either
- return b
-
-def convert_characters(connection, field, data):
- field_charset = charset_by_id(field.charsetnr).name
- if field.flags & FLAG.SET:
- return convert_set(data.decode(field_charset))
- if field.flags & FLAG.BINARY:
- return data
-
- if connection.use_unicode:
- data = data.decode(field_charset)
- elif connection.charset != field_charset:
- data = data.decode(field_charset)
- data = data.encode(connection.charset)
- return data
-
-def convert_int(connection, field, data):
- return int(data)
-
-def convert_long(connection, field, data):
- return long(data)
-
-def convert_float(connection, field, data):
- return float(data)
-
-encoders = {
- bool: escape_bool,
- int: escape_int,
- long: escape_long,
- float: escape_float,
- str: escape_string,
- unicode: escape_unicode,
- tuple: escape_sequence,
- list:escape_sequence,
- set:escape_sequence,
- dict:escape_dict,
- type(None):escape_None,
- datetime.date: escape_date,
- datetime.datetime : escape_datetime,
- datetime.timedelta : escape_timedelta,
- datetime.time : escape_time,
- time.struct_time : escape_struct_time,
- }
-
-decoders = {
- FIELD_TYPE.BIT: convert_bit,
- FIELD_TYPE.TINY: convert_int,
- FIELD_TYPE.SHORT: convert_int,
- FIELD_TYPE.LONG: convert_long,
- FIELD_TYPE.FLOAT: convert_float,
- FIELD_TYPE.DOUBLE: convert_float,
- FIELD_TYPE.DECIMAL: convert_float,
- FIELD_TYPE.NEWDECIMAL: convert_float,
- FIELD_TYPE.LONGLONG: convert_long,
- FIELD_TYPE.INT24: convert_int,
- FIELD_TYPE.YEAR: convert_int,
- FIELD_TYPE.TIMESTAMP: convert_mysql_timestamp,
- FIELD_TYPE.DATETIME: convert_datetime,
- FIELD_TYPE.TIME: convert_timedelta,
- FIELD_TYPE.DATE: convert_date,
- FIELD_TYPE.SET: convert_set,
- FIELD_TYPE.BLOB: convert_characters,
- FIELD_TYPE.TINY_BLOB: convert_characters,
- FIELD_TYPE.MEDIUM_BLOB: convert_characters,
- FIELD_TYPE.LONG_BLOB: convert_characters,
- FIELD_TYPE.STRING: convert_characters,
- FIELD_TYPE.VAR_STRING: convert_characters,
- FIELD_TYPE.VARCHAR: convert_characters,
- #FIELD_TYPE.BLOB: str,
- #FIELD_TYPE.STRING: str,
- #FIELD_TYPE.VAR_STRING: str,
- #FIELD_TYPE.VARCHAR: str
- }
-conversions = decoders # for MySQLdb compatibility
-
-try:
- # python version > 2.3
- from decimal import Decimal
- def convert_decimal(connection, field, data):
- data = data.decode(connection.charset)
- return Decimal(data)
- decoders[FIELD_TYPE.DECIMAL] = convert_decimal
- decoders[FIELD_TYPE.NEWDECIMAL] = convert_decimal
-
- def escape_decimal(obj):
- return unicode(obj)
- encoders[Decimal] = escape_decimal
-
-except ImportError:
- pass
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/cursors.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/cursors.py b/tools/testClient/pymysql/cursors.py
deleted file mode 100644
index 591f92b..0000000
--- a/tools/testClient/pymysql/cursors.py
+++ /dev/null
@@ -1,309 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-import struct
-import re
-
-try:
- import cStringIO as StringIO
-except ImportError:
- import StringIO
-
-from err import Warning, Error, InterfaceError, DataError, \
- DatabaseError, OperationalError, IntegrityError, InternalError, \
- NotSupportedError, ProgrammingError
-
-insert_values = re.compile(r'\svalues\s*(\(.+\))', re.IGNORECASE)
-
-class Cursor(object):
- '''
- This is the object you use to interact with the database.
- '''
- def __init__(self, connection):
- '''
- Do not create an instance of a Cursor yourself. Call
- connections.Connection.cursor().
- '''
- from weakref import proxy
- self.connection = proxy(connection)
- self.description = None
- self.rownumber = 0
- self.rowcount = -1
- self.arraysize = 1
- self._executed = None
- self.messages = []
- self.errorhandler = connection.errorhandler
- self._has_next = None
- self._rows = ()
-
- def __del__(self):
- '''
- When this gets GC'd close it.
- '''
- self.close()
-
- def close(self):
- '''
- Closing a cursor just exhausts all remaining data.
- '''
- if not self.connection:
- return
- try:
- while self.nextset():
- pass
- except:
- pass
-
- self.connection = None
-
- def _get_db(self):
- if not self.connection:
- self.errorhandler(self, ProgrammingError, "cursor closed")
- return self.connection
-
- def _check_executed(self):
- if not self._executed:
- self.errorhandler(self, ProgrammingError, "execute() first")
-
- def setinputsizes(self, *args):
- """Does nothing, required by DB API."""
-
- def setoutputsizes(self, *args):
- """Does nothing, required by DB API."""
-
- def nextset(self):
- ''' Get the next query set '''
- if self._executed:
- self.fetchall()
- del self.messages[:]
-
- if not self._has_next:
- return None
- connection = self._get_db()
- connection.next_result()
- self._do_get_result()
- return True
-
- def execute(self, query, args=None):
- ''' Execute a query '''
- from sys import exc_info
-
- conn = self._get_db()
- charset = conn.charset
- del self.messages[:]
-
- # TODO: make sure that conn.escape is correct
-
- if args is not None:
- if isinstance(args, tuple) or isinstance(args, list):
- escaped_args = tuple(conn.escape(arg) for arg in args)
- elif isinstance(args, dict):
- escaped_args = dict((key, conn.escape(val)) for (key, val) in args.items())
- else:
- #If it's not a dictionary let's try escaping it anyways.
- #Worst case it will throw a Value error
- escaped_args = conn.escape(args)
-
- query = query % escaped_args
-
- if isinstance(query, unicode):
- query = query.encode(charset)
-
- result = 0
- try:
- result = self._query(query)
- except:
- exc, value, tb = exc_info()
- del tb
- self.messages.append((exc,value))
- self.errorhandler(self, exc, value)
-
- self._executed = query
- return result
-
- def executemany(self, query, args):
- ''' Run several data against one query '''
- del self.messages[:]
- #conn = self._get_db()
- if not args:
- return
- #charset = conn.charset
- #if isinstance(query, unicode):
- # query = query.encode(charset)
-
- self.rowcount = sum([ self.execute(query, arg) for arg in args ])
- return self.rowcount
-
-
- def callproc(self, procname, args=()):
- """Execute stored procedure procname with args
-
- procname -- string, name of procedure to execute on server
-
- args -- Sequence of parameters to use with procedure
-
- Returns the original args.
-
- Compatibility warning: PEP-249 specifies that any modified
- parameters must be returned. This is currently impossible
- as they are only available by storing them in a server
- variable and then retrieved by a query. Since stored
- procedures return zero or more result sets, there is no
- reliable way to get at OUT or INOUT parameters via callproc.
- The server variables are named @_procname_n, where procname
- is the parameter above and n is the position of the parameter
- (from zero). Once all result sets generated by the procedure
- have been fetched, you can issue a SELECT @_procname_0, ...
- query using .execute() to get any OUT or INOUT values.
-
- Compatibility warning: The act of calling a stored procedure
- itself creates an empty result set. This appears after any
- result sets generated by the procedure. This is non-standard
- behavior with respect to the DB-API. Be sure to use nextset()
- to advance through all result sets; otherwise you may get
- disconnected.
- """
- conn = self._get_db()
- for index, arg in enumerate(args):
- q = "SET @_%s_%d=%s" % (procname, index, conn.escape(arg))
- if isinstance(q, unicode):
- q = q.encode(conn.charset)
- self._query(q)
- self.nextset()
-
- q = "CALL %s(%s)" % (procname,
- ','.join(['@_%s_%d' % (procname, i)
- for i in range(len(args))]))
- if isinstance(q, unicode):
- q = q.encode(conn.charset)
- self._query(q)
- self._executed = q
-
- return args
-
- def fetchone(self):
- ''' Fetch the next row '''
- self._check_executed()
- if self._rows is None or self.rownumber >= len(self._rows):
- return None
- result = self._rows[self.rownumber]
- self.rownumber += 1
- return result
-
- def fetchmany(self, size=None):
- ''' Fetch several rows '''
- self._check_executed()
- end = self.rownumber + (size or self.arraysize)
- result = self._rows[self.rownumber:end]
- if self._rows is None:
- return None
- self.rownumber = min(end, len(self._rows))
- return result
-
- def fetchall(self):
- ''' Fetch all the rows '''
- self._check_executed()
- if self._rows is None:
- return None
- if self.rownumber:
- result = self._rows[self.rownumber:]
- else:
- result = self._rows
- self.rownumber = len(self._rows)
- return result
-
- def scroll(self, value, mode='relative'):
- self._check_executed()
- if mode == 'relative':
- r = self.rownumber + value
- elif mode == 'absolute':
- r = value
- else:
- self.errorhandler(self, ProgrammingError,
- "unknown scroll mode %s" % mode)
-
- if r < 0 or r >= len(self._rows):
- self.errorhandler(self, IndexError, "out of range")
- self.rownumber = r
-
- def _query(self, q):
- conn = self._get_db()
- self._last_executed = q
- conn.query(q)
- self._do_get_result()
- return self.rowcount
-
- def _do_get_result(self):
- conn = self._get_db()
- self.rowcount = conn._result.affected_rows
-
- self.rownumber = 0
- self.description = conn._result.description
- self.lastrowid = conn._result.insert_id
- self._rows = conn._result.rows
- self._has_next = conn._result.has_next
-
- def __iter__(self):
- return iter(self.fetchone, None)
-
- Warning = Warning
- Error = Error
- InterfaceError = InterfaceError
- DatabaseError = DatabaseError
- DataError = DataError
- OperationalError = OperationalError
- IntegrityError = IntegrityError
- InternalError = InternalError
- ProgrammingError = ProgrammingError
- NotSupportedError = NotSupportedError
-
-class DictCursor(Cursor):
- """A cursor which returns results as a dictionary"""
-
- def execute(self, query, args=None):
- result = super(DictCursor, self).execute(query, args)
- if self.description:
- self._fields = [ field[0] for field in self.description ]
- return result
-
- def fetchone(self):
- ''' Fetch the next row '''
- self._check_executed()
- if self._rows is None or self.rownumber >= len(self._rows):
- return None
- result = dict(zip(self._fields, self._rows[self.rownumber]))
- self.rownumber += 1
- return result
-
- def fetchmany(self, size=None):
- ''' Fetch several rows '''
- self._check_executed()
- if self._rows is None:
- return None
- end = self.rownumber + (size or self.arraysize)
- result = [ dict(zip(self._fields, r)) for r in self._rows[self.rownumber:end] ]
- self.rownumber = min(end, len(self._rows))
- return tuple(result)
-
- def fetchall(self):
- ''' Fetch all the rows '''
- self._check_executed()
- if self._rows is None:
- return None
- if self.rownumber:
- result = [ dict(zip(self._fields, r)) for r in self._rows[self.rownumber:] ]
- else:
- result = [ dict(zip(self._fields, r)) for r in self._rows ]
- self.rownumber = len(self._rows)
- return tuple(result)
-
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/err.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/err.py b/tools/testClient/pymysql/err.py
deleted file mode 100644
index df666a9..0000000
--- a/tools/testClient/pymysql/err.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-import struct
-
-
-try:
- StandardError, Warning
-except ImportError:
- try:
- from exceptions import StandardError, Warning
- except ImportError:
- import sys
- e = sys.modules['exceptions']
- StandardError = e.StandardError
- Warning = e.Warning
-
-from constants import ER
-import sys
-
-class MySQLError(StandardError):
-
- """Exception related to operation with MySQL."""
-
-
-class Warning(Warning, MySQLError):
-
- """Exception raised for important warnings like data truncations
- while inserting, etc."""
-
-class Error(MySQLError):
-
- """Exception that is the base class of all other error exceptions
- (not Warning)."""
-
-
-class InterfaceError(Error):
-
- """Exception raised for errors that are related to the database
- interface rather than the database itself."""
-
-
-class DatabaseError(Error):
-
- """Exception raised for errors that are related to the
- database."""
-
-
-class DataError(DatabaseError):
-
- """Exception raised for errors that are due to problems with the
- processed data like division by zero, numeric value out of range,
- etc."""
-
-
-class OperationalError(DatabaseError):
-
- """Exception raised for errors that are related to the database's
- operation and not necessarily under the control of the programmer,
- e.g. an unexpected disconnect occurs, the data source name is not
- found, a transaction could not be processed, a memory allocation
- error occurred during processing, etc."""
-
-
-class IntegrityError(DatabaseError):
-
- """Exception raised when the relational integrity of the database
- is affected, e.g. a foreign key check fails, duplicate key,
- etc."""
-
-
-class InternalError(DatabaseError):
-
- """Exception raised when the database encounters an internal
- error, e.g. the cursor is not valid anymore, the transaction is
- out of sync, etc."""
-
-
-class ProgrammingError(DatabaseError):
-
- """Exception raised for programming errors, e.g. table not found
- or already exists, syntax error in the SQL statement, wrong number
- of parameters specified, etc."""
-
-
-class NotSupportedError(DatabaseError):
-
- """Exception raised in case a method or database API was used
- which is not supported by the database, e.g. requesting a
- .rollback() on a connection that does not support transaction or
- has transactions turned off."""
-
-
-error_map = {}
-
-def _map_error(exc, *errors):
- for error in errors:
- error_map[error] = exc
-
-_map_error(ProgrammingError, ER.DB_CREATE_EXISTS, ER.SYNTAX_ERROR,
- ER.PARSE_ERROR, ER.NO_SUCH_TABLE, ER.WRONG_DB_NAME,
- ER.WRONG_TABLE_NAME, ER.FIELD_SPECIFIED_TWICE,
- ER.INVALID_GROUP_FUNC_USE, ER.UNSUPPORTED_EXTENSION,
- ER.TABLE_MUST_HAVE_COLUMNS, ER.CANT_DO_THIS_DURING_AN_TRANSACTION)
-_map_error(DataError, ER.WARN_DATA_TRUNCATED, ER.WARN_NULL_TO_NOTNULL,
- ER.WARN_DATA_OUT_OF_RANGE, ER.NO_DEFAULT, ER.PRIMARY_CANT_HAVE_NULL,
- ER.DATA_TOO_LONG, ER.DATETIME_FUNCTION_OVERFLOW)
-_map_error(IntegrityError, ER.DUP_ENTRY, ER.NO_REFERENCED_ROW,
- ER.NO_REFERENCED_ROW_2, ER.ROW_IS_REFERENCED, ER.ROW_IS_REFERENCED_2,
- ER.CANNOT_ADD_FOREIGN)
-_map_error(NotSupportedError, ER.WARNING_NOT_COMPLETE_ROLLBACK,
- ER.NOT_SUPPORTED_YET, ER.FEATURE_DISABLED, ER.UNKNOWN_STORAGE_ENGINE)
-_map_error(OperationalError, ER.DBACCESS_DENIED_ERROR, ER.ACCESS_DENIED_ERROR,
- ER.TABLEACCESS_DENIED_ERROR, ER.COLUMNACCESS_DENIED_ERROR)
-
-del _map_error, ER
-
-
-def _get_error_info(data):
- errno = struct.unpack('<h', data[1:3])[0]
- if sys.version_info[0] == 3:
- is_41 = data[3] == ord("#")
- else:
- is_41 = data[3] == "#"
- if is_41:
- # version 4.1
- sqlstate = data[4:9].decode("utf8")
- errorvalue = data[9:].decode("utf8")
- return (errno, sqlstate, errorvalue)
- else:
- # version 4.0
- return (errno, None, data[3:].decode("utf8"))
-
-def _check_mysql_exception(errinfo):
- errno, sqlstate, errorvalue = errinfo
- errorclass = error_map.get(errno, None)
- if errorclass:
- raise errorclass, (errno,errorvalue)
-
- # couldn't find the right error number
- raise InternalError, (errno, errorvalue)
-
-def raise_mysql_exception(data):
- errinfo = _get_error_info(data)
- _check_mysql_exception(errinfo)
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/__init__.py b/tools/testClient/pymysql/tests/__init__.py
deleted file mode 100644
index 569b9a0..0000000
--- a/tools/testClient/pymysql/tests/__init__.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-from pymysql.tests.test_issues import *
-from pymysql.tests.test_example import *
-from pymysql.tests.test_basic import *
-from pymysql.tests.test_DictCursor import *
-
-import sys
-if sys.version_info[0] == 2:
- # MySQLdb tests were designed for Python 3
- from pymysql.tests.thirdparty import *
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/base.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/base.py b/tools/testClient/pymysql/tests/base.py
deleted file mode 100644
index a09eb09..0000000
--- a/tools/testClient/pymysql/tests/base.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-import pymysql
-import unittest
-
-class PyMySQLTestCase(unittest.TestCase):
- # Edit this to suit your test environment.
- databases = [
- {"host":"localhost","user":"root",
- "passwd":"","db":"test_pymysql", "use_unicode": True},
- {"host":"localhost","user":"root","passwd":"","db":"test_pymysql2"}]
-
- def setUp(self):
- self.connections = []
-
- for params in self.databases:
- self.connections.append(pymysql.connect(**params))
-
- def tearDown(self):
- for connection in self.connections:
- connection.close()
-
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/test_DictCursor.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/test_DictCursor.py b/tools/testClient/pymysql/tests/test_DictCursor.py
deleted file mode 100644
index e43100b..0000000
--- a/tools/testClient/pymysql/tests/test_DictCursor.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-from pymysql.tests import base
-import pymysql.cursors
-
-import datetime
-
-class TestDictCursor(base.PyMySQLTestCase):
-
- def test_DictCursor(self):
- #all assert test compare to the structure as would come out from MySQLdb
- conn = self.connections[0]
- c = conn.cursor(pymysql.cursors.DictCursor)
- # create a table ane some data to query
- c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
- data = (("bob",21,"1990-02-06 23:04:56"),
- ("jim",56,"1955-05-09 13:12:45"),
- ("fred",100,"1911-09-12 01:01:01"))
- bob = {'name':'bob','age':21,'DOB':datetime.datetime(1990, 02, 6, 23, 04, 56)}
- jim = {'name':'jim','age':56,'DOB':datetime.datetime(1955, 05, 9, 13, 12, 45)}
- fred = {'name':'fred','age':100,'DOB':datetime.datetime(1911, 9, 12, 1, 1, 1)}
- try:
- c.executemany("insert into dictcursor values (%s,%s,%s)", data)
- # try an update which should return no rows
- c.execute("update dictcursor set age=20 where name='bob'")
- bob['age'] = 20
- # pull back the single row dict for bob and check
- c.execute("SELECT * from dictcursor where name='bob'")
- r = c.fetchone()
- self.assertEqual(bob,r,"fetchone via DictCursor failed")
- # same again, but via fetchall => tuple)
- c.execute("SELECT * from dictcursor where name='bob'")
- r = c.fetchall()
- self.assertEqual((bob,),r,"fetch a 1 row result via fetchall failed via DictCursor")
- # same test again but iterate over the
- c.execute("SELECT * from dictcursor where name='bob'")
- for r in c:
- self.assertEqual(bob, r,"fetch a 1 row result via iteration failed via DictCursor")
- # get all 3 row via fetchall
- c.execute("SELECT * from dictcursor")
- r = c.fetchall()
- self.assertEqual((bob,jim,fred), r, "fetchall failed via DictCursor")
- #same test again but do a list comprehension
- c.execute("SELECT * from dictcursor")
- r = [x for x in c]
- self.assertEqual([bob,jim,fred], r, "list comprehension failed via DictCursor")
- # get all 2 row via fetchmany
- c.execute("SELECT * from dictcursor")
- r = c.fetchmany(2)
- self.assertEqual((bob,jim), r, "fetchmany failed via DictCursor")
- finally:
- c.execute("drop table dictcursor")
-
-__all__ = ["TestDictCursor"]
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/test_basic.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/test_basic.py b/tools/testClient/pymysql/tests/test_basic.py
deleted file mode 100644
index 79908b2..0000000
--- a/tools/testClient/pymysql/tests/test_basic.py
+++ /dev/null
@@ -1,205 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-from pymysql.tests import base
-from pymysql import util
-
-import time
-import datetime
-
-class TestConversion(base.PyMySQLTestCase):
- def test_datatypes(self):
- """ test every data type """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table test_datatypes (b bit, i int, l bigint, f real, s varchar(32), u varchar(32), bb blob, d date, dt datetime, ts timestamp, td time, t time, st datetime)")
- try:
- # insert values
- v = (True, -3, 123456789012, 5.7, "hello'\" world", u"Espa\xc3\xb1ol", "binary\x00data".encode(conn.charset), datetime.date(1988,2,2), datetime.datetime.now(), datetime.timedelta(5,6), datetime.time(16,32), time.localtime())
- c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", v)
- c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes")
- r = c.fetchone()
- self.assertEqual(util.int2byte(1), r[0])
- self.assertEqual(v[1:8], r[1:8])
- # mysql throws away microseconds so we need to check datetimes
- # specially. additionally times are turned into timedeltas.
- self.assertEqual(datetime.datetime(*v[8].timetuple()[:6]), r[8])
- self.assertEqual(v[9], r[9]) # just timedeltas
- self.assertEqual(datetime.timedelta(0, 60 * (v[10].hour * 60 + v[10].minute)), r[10])
- self.assertEqual(datetime.datetime(*v[-1][:6]), r[-1])
-
- c.execute("delete from test_datatypes")
-
- # check nulls
- c.execute("insert into test_datatypes (b,i,l,f,s,u,bb,d,dt,td,t,st) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", [None] * 12)
- c.execute("select b,i,l,f,s,u,bb,d,dt,td,t,st from test_datatypes")
- r = c.fetchone()
- self.assertEqual(tuple([None] * 12), r)
-
- c.execute("delete from test_datatypes")
-
- # check sequence type
- c.execute("insert into test_datatypes (i, l) values (2,4), (6,8), (10,12)")
- c.execute("select l from test_datatypes where i in %s order by i", ((2,6),))
- r = c.fetchall()
- self.assertEqual(((4,),(8,)), r)
- finally:
- c.execute("drop table test_datatypes")
-
- def test_dict(self):
- """ test dict escaping """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table test_dict (a integer, b integer, c integer)")
- try:
- c.execute("insert into test_dict (a,b,c) values (%(a)s, %(b)s, %(c)s)", {"a":1,"b":2,"c":3})
- c.execute("select a,b,c from test_dict")
- self.assertEqual((1,2,3), c.fetchone())
- finally:
- c.execute("drop table test_dict")
-
- def test_string(self):
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table test_dict (a text)")
- test_value = "I am a test string"
- try:
- c.execute("insert into test_dict (a) values (%s)", test_value)
- c.execute("select a from test_dict")
- self.assertEqual((test_value,), c.fetchone())
- finally:
- c.execute("drop table test_dict")
-
- def test_integer(self):
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table test_dict (a integer)")
- test_value = 12345
- try:
- c.execute("insert into test_dict (a) values (%s)", test_value)
- c.execute("select a from test_dict")
- self.assertEqual((test_value,), c.fetchone())
- finally:
- c.execute("drop table test_dict")
-
-
- def test_big_blob(self):
- """ test tons of data """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table test_big_blob (b blob)")
- try:
- data = "pymysql" * 1024
- c.execute("insert into test_big_blob (b) values (%s)", (data,))
- c.execute("select b from test_big_blob")
- self.assertEqual(data.encode(conn.charset), c.fetchone()[0])
- finally:
- c.execute("drop table test_big_blob")
-
-class TestCursor(base.PyMySQLTestCase):
- # this test case does not work quite right yet, however,
- # we substitute in None for the erroneous field which is
- # compatible with the DB-API 2.0 spec and has not broken
- # any unit tests for anything we've tried.
-
- #def test_description(self):
- # """ test description attribute """
- # # result is from MySQLdb module
- # r = (('Host', 254, 11, 60, 60, 0, 0),
- # ('User', 254, 16, 16, 16, 0, 0),
- # ('Password', 254, 41, 41, 41, 0, 0),
- # ('Select_priv', 254, 1, 1, 1, 0, 0),
- # ('Insert_priv', 254, 1, 1, 1, 0, 0),
- # ('Update_priv', 254, 1, 1, 1, 0, 0),
- # ('Delete_priv', 254, 1, 1, 1, 0, 0),
- # ('Create_priv', 254, 1, 1, 1, 0, 0),
- # ('Drop_priv', 254, 1, 1, 1, 0, 0),
- # ('Reload_priv', 254, 1, 1, 1, 0, 0),
- # ('Shutdown_priv', 254, 1, 1, 1, 0, 0),
- # ('Process_priv', 254, 1, 1, 1, 0, 0),
- # ('File_priv', 254, 1, 1, 1, 0, 0),
- # ('Grant_priv', 254, 1, 1, 1, 0, 0),
- # ('References_priv', 254, 1, 1, 1, 0, 0),
- # ('Index_priv', 254, 1, 1, 1, 0, 0),
- # ('Alter_priv', 254, 1, 1, 1, 0, 0),
- # ('Show_db_priv', 254, 1, 1, 1, 0, 0),
- # ('Super_priv', 254, 1, 1, 1, 0, 0),
- # ('Create_tmp_table_priv', 254, 1, 1, 1, 0, 0),
- # ('Lock_tables_priv', 254, 1, 1, 1, 0, 0),
- # ('Execute_priv', 254, 1, 1, 1, 0, 0),
- # ('Repl_slave_priv', 254, 1, 1, 1, 0, 0),
- # ('Repl_client_priv', 254, 1, 1, 1, 0, 0),
- # ('Create_view_priv', 254, 1, 1, 1, 0, 0),
- # ('Show_view_priv', 254, 1, 1, 1, 0, 0),
- # ('Create_routine_priv', 254, 1, 1, 1, 0, 0),
- # ('Alter_routine_priv', 254, 1, 1, 1, 0, 0),
- # ('Create_user_priv', 254, 1, 1, 1, 0, 0),
- # ('Event_priv', 254, 1, 1, 1, 0, 0),
- # ('Trigger_priv', 254, 1, 1, 1, 0, 0),
- # ('ssl_type', 254, 0, 9, 9, 0, 0),
- # ('ssl_cipher', 252, 0, 65535, 65535, 0, 0),
- # ('x509_issuer', 252, 0, 65535, 65535, 0, 0),
- # ('x509_subject', 252, 0, 65535, 65535, 0, 0),
- # ('max_questions', 3, 1, 11, 11, 0, 0),
- # ('max_updates', 3, 1, 11, 11, 0, 0),
- # ('max_connections', 3, 1, 11, 11, 0, 0),
- # ('max_user_connections', 3, 1, 11, 11, 0, 0))
- # conn = self.connections[0]
- # c = conn.cursor()
- # c.execute("select * from mysql.user")
- #
- # self.assertEqual(r, c.description)
-
- def test_fetch_no_result(self):
- """ test a fetchone() with no rows """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table test_nr (b varchar(32))")
- try:
- data = "pymysql"
- c.execute("insert into test_nr (b) values (%s)", (data,))
- self.assertEqual(None, c.fetchone())
- finally:
- c.execute("drop table test_nr")
-
- def test_aggregates(self):
- """ test aggregate functions """
- conn = self.connections[0]
- c = conn.cursor()
- try:
- c.execute('create table test_aggregates (i integer)')
- for i in xrange(0, 10):
- c.execute('insert into test_aggregates (i) values (%s)', (i,))
- c.execute('select sum(i) from test_aggregates')
- r, = c.fetchone()
- self.assertEqual(sum(range(0,10)), r)
- finally:
- c.execute('drop table test_aggregates')
-
- def test_single_tuple(self):
- """ test a single tuple """
- conn = self.connections[0]
- c = conn.cursor()
- try:
- c.execute("create table mystuff (id integer primary key)")
- c.execute("insert into mystuff (id) values (1)")
- c.execute("insert into mystuff (id) values (2)")
- c.execute("select id from mystuff where id in %s", ((1,),))
- self.assertEqual([(1,)], list(c.fetchall()))
- finally:
- c.execute("drop table mystuff")
-
-__all__ = ["TestConversion","TestCursor"]
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/test_example.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/test_example.py b/tools/testClient/pymysql/tests/test_example.py
deleted file mode 100644
index 0929f26..0000000
--- a/tools/testClient/pymysql/tests/test_example.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-import pymysql
-from pymysql.tests import base
-
-class TestExample(base.PyMySQLTestCase):
- def test_example(self):
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql')
-
-
- cur = conn.cursor()
-
- cur.execute("SELECT Host,User FROM user")
-
- # print cur.description
-
- # r = cur.fetchall()
- # print r
- # ...or...
- u = False
-
- for r in cur.fetchall():
- u = u or conn.user in r
-
- self.assertTrue(u)
-
- cur.close()
- conn.close()
-
-__all__ = ["TestExample"]
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/test_issues.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/test_issues.py b/tools/testClient/pymysql/tests/test_issues.py
deleted file mode 100644
index 5702697..0000000
--- a/tools/testClient/pymysql/tests/test_issues.py
+++ /dev/null
@@ -1,280 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-import pymysql
-from pymysql.tests import base
-
-import sys
-
-try:
- import imp
- reload = imp.reload
-except AttributeError:
- pass
-
-import datetime
-
-class TestOldIssues(base.PyMySQLTestCase):
- def test_issue_3(self):
- """ undefined methods datetime_or_None, date_or_None """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)")
- try:
- c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None))
- c.execute("select d from issue3")
- self.assertEqual(None, c.fetchone()[0])
- c.execute("select t from issue3")
- self.assertEqual(None, c.fetchone()[0])
- c.execute("select dt from issue3")
- self.assertEqual(None, c.fetchone()[0])
- c.execute("select ts from issue3")
- self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
- finally:
- c.execute("drop table issue3")
-
- def test_issue_4(self):
- """ can't retrieve TIMESTAMP fields """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table issue4 (ts timestamp)")
- try:
- c.execute("insert into issue4 (ts) values (now())")
- c.execute("select ts from issue4")
- self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
- finally:
- c.execute("drop table issue4")
-
- def test_issue_5(self):
- """ query on information_schema.tables fails """
- con = self.connections[0]
- cur = con.cursor()
- cur.execute("select * from information_schema.tables")
-
- def test_issue_6(self):
- """ exception: TypeError: ord() expected a character, but string of length 0 found """
- conn = pymysql.connect(host="localhost",user="root",passwd="",db="mysql")
- c = conn.cursor()
- c.execute("select * from user")
- conn.close()
-
- def test_issue_8(self):
- """ Primary Key and Index error when selecting data """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh`
-datetime NOT NULL DEFAULT '0000-00-00 00:00:00', `echeance` int(1) NOT NULL
-DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY
-KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""")
- try:
- self.assertEqual(0, c.execute("SELECT * FROM test"))
- c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)")
- self.assertEqual(0, c.execute("SELECT * FROM test"))
- finally:
- c.execute("drop table test")
-
- def test_issue_9(self):
- """ sets DeprecationWarning in Python 2.6 """
- try:
- reload(pymysql)
- except DeprecationWarning:
- self.fail()
-
- def test_issue_10(self):
- """ Allocate a variable to return when the exception handler is permissive """
- conn = self.connections[0]
- conn.errorhandler = lambda cursor, errorclass, errorvalue: None
- cur = conn.cursor()
- cur.execute( "create table t( n int )" )
- cur.execute( "create table t( n int )" )
-
- def test_issue_13(self):
- """ can't handle large result fields """
- conn = self.connections[0]
- cur = conn.cursor()
- try:
- cur.execute("create table issue13 (t text)")
- # ticket says 18k
- size = 18*1024
- cur.execute("insert into issue13 (t) values (%s)", ("x" * size,))
- cur.execute("select t from issue13")
- # use assert_ so that obscenely huge error messages don't print
- r = cur.fetchone()[0]
- self.assert_("x" * size == r)
- finally:
- cur.execute("drop table issue13")
-
- def test_issue_14(self):
- """ typo in converters.py """
- self.assertEqual('1', pymysql.converters.escape_item(1, "utf8"))
- self.assertEqual('1', pymysql.converters.escape_item(1L, "utf8"))
-
- self.assertEqual('1', pymysql.converters.escape_object(1))
- self.assertEqual('1', pymysql.converters.escape_object(1L))
-
- def test_issue_15(self):
- """ query should be expanded before perform character encoding """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table issue15 (t varchar(32))")
- try:
- c.execute("insert into issue15 (t) values (%s)", (u'\xe4\xf6\xfc',))
- c.execute("select t from issue15")
- self.assertEqual(u'\xe4\xf6\xfc', c.fetchone()[0])
- finally:
- c.execute("drop table issue15")
-
- def test_issue_16(self):
- """ Patch for string and tuple escaping """
- conn = self.connections[0]
- c = conn.cursor()
- c.execute("create table issue16 (name varchar(32) primary key, email varchar(32))")
- try:
- c.execute("insert into issue16 (name, email) values ('pete', 'floydophone')")
- c.execute("select email from issue16 where name=%s", ("pete",))
- self.assertEqual("floydophone", c.fetchone()[0])
- finally:
- c.execute("drop table issue16")
-
- def test_issue_17(self):
- """ could not connect mysql use passwod """
- conn = self.connections[0]
- host = self.databases[0]["host"]
- db = self.databases[0]["db"]
- c = conn.cursor()
- # grant access to a table to a user with a password
- try:
- c.execute("create table issue17 (x varchar(32) primary key)")
- c.execute("insert into issue17 (x) values ('hello, world!')")
- c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
- conn.commit()
-
- conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
- c2 = conn2.cursor()
- c2.execute("select x from issue17")
- self.assertEqual("hello, world!", c2.fetchone()[0])
- finally:
- c.execute("drop table issue17")
-
-def _uni(s, e):
- # hack for py3
- if sys.version_info[0] > 2:
- return unicode(bytes(s, sys.getdefaultencoding()), e)
- else:
- return unicode(s, e)
-
-class TestNewIssues(base.PyMySQLTestCase):
- def test_issue_34(self):
- try:
- pymysql.connect(host="localhost", port=1237, user="root")
- self.fail()
- except pymysql.OperationalError, e:
- self.assertEqual(2003, e.args[0])
- except:
- self.fail()
-
- def test_issue_33(self):
- conn = pymysql.connect(host="localhost", user="root", db=self.databases[0]["db"], charset="utf8")
- c = conn.cursor()
- try:
- c.execute(_uni("create table hei\xc3\x9fe (name varchar(32))", "utf8"))
- c.execute(_uni("insert into hei\xc3\x9fe (name) values ('Pi\xc3\xb1ata')", "utf8"))
- c.execute(_uni("select name from hei\xc3\x9fe", "utf8"))
- self.assertEqual(_uni("Pi\xc3\xb1ata","utf8"), c.fetchone()[0])
- finally:
- c.execute(_uni("drop table hei\xc3\x9fe", "utf8"))
-
- # Will fail without manual intervention:
- #def test_issue_35(self):
- #
- # conn = self.connections[0]
- # c = conn.cursor()
- # print "sudo killall -9 mysqld within the next 10 seconds"
- # try:
- # c.execute("select sleep(10)")
- # self.fail()
- # except pymysql.OperationalError, e:
- # self.assertEqual(2013, e.args[0])
-
- def test_issue_36(self):
- conn = self.connections[0]
- c = conn.cursor()
- # kill connections[0]
- original_count = c.execute("show processlist")
- kill_id = None
- for id,user,host,db,command,time,state,info in c.fetchall():
- if info == "show processlist":
- kill_id = id
- break
- # now nuke the connection
- conn.kill(kill_id)
- # make sure this connection has broken
- try:
- c.execute("show tables")
- self.fail()
- except:
- pass
- # check the process list from the other connection
- self.assertEqual(original_count - 1, self.connections[1].cursor().execute("show processlist"))
- del self.connections[0]
-
- def test_issue_37(self):
- conn = self.connections[0]
- c = conn.cursor()
- self.assertEqual(1, c.execute("SELECT @foo"))
- self.assertEqual((None,), c.fetchone())
- self.assertEqual(0, c.execute("SET @foo = 'bar'"))
- c.execute("set @foo = 'bar'")
-
- def test_issue_38(self):
- conn = self.connections[0]
- c = conn.cursor()
- datum = "a" * 1024 * 1023 # reduced size for most default mysql installs
-
- try:
- c.execute("create table issue38 (id integer, data mediumblob)")
- c.execute("insert into issue38 values (1, %s)", (datum,))
- finally:
- c.execute("drop table issue38")
-
- def disabled_test_issue_54(self):
- conn = self.connections[0]
- c = conn.cursor()
- big_sql = "select * from issue54 where "
- big_sql += " and ".join("%d=%d" % (i,i) for i in xrange(0, 100000))
-
- try:
- c.execute("create table issue54 (id integer primary key)")
- c.execute("insert into issue54 (id) values (7)")
- c.execute(big_sql)
- self.assertEquals(7, c.fetchone()[0])
- finally:
- c.execute("drop table issue54")
-
-class TestGitHubIssues(base.PyMySQLTestCase):
- def test_issue_66(self):
- conn = self.connections[0]
- c = conn.cursor()
- self.assertEquals(0, conn.insert_id())
- try:
- c.execute("create table issue66 (id integer primary key auto_increment, x integer)")
- c.execute("insert into issue66 (x) values (1)")
- c.execute("insert into issue66 (x) values (1)")
- self.assertEquals(2, conn.insert_id())
- finally:
- c.execute("drop table issue66")
-
-__all__ = ["TestOldIssues", "TestNewIssues", "TestGitHubIssues"]
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/thirdparty/__init__.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/thirdparty/__init__.py b/tools/testClient/pymysql/tests/thirdparty/__init__.py
deleted file mode 100644
index 7f108fa..0000000
--- a/tools/testClient/pymysql/tests/thirdparty/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-from test_MySQLdb import *
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/__init__.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/__init__.py b/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/__init__.py
deleted file mode 100644
index 18c638c..0000000
--- a/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-from test_MySQLdb_capabilities import test_MySQLdb as test_capabilities
-from test_MySQLdb_nonstandard import *
-from test_MySQLdb_dbapi20 import test_MySQLdb as test_dbapi2
-
-if __name__ == "__main__":
- import unittest
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/e38db694/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/capabilities.py
----------------------------------------------------------------------
diff --git a/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/capabilities.py b/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/capabilities.py
deleted file mode 100644
index 5f1ae84..0000000
--- a/tools/testClient/pymysql/tests/thirdparty/test_MySQLdb/capabilities.py
+++ /dev/null
@@ -1,304 +0,0 @@
-#!/usr/bin/env python -O
-# Copyright 2012 Citrix Systems, Inc. Licensed under the
-# Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. Citrix Systems, Inc.
-# reserves all rights not expressly granted by the License.
-# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Automatically generated by addcopyright.py at 04/03/2012
-""" Script to test database capabilities and the DB-API interface
- for functionality and memory leaks.
-
- Adapted from a script by M-A Lemburg.
-
-"""
-from time import time
-import array
-import unittest
-
-
-class DatabaseTest(unittest.TestCase):
-
- db_module = None
- connect_args = ()
- connect_kwargs = dict(use_unicode=True, charset="utf8")
- create_table_extra = "ENGINE=INNODB CHARACTER SET UTF8"
- rows = 10
- debug = False
-
- def setUp(self):
- import gc
- db = self.db_module.connect(*self.connect_args, **self.connect_kwargs)
- self.connection = db
- self.cursor = db.cursor()
- self.BLOBText = ''.join([chr(i) for i in range(256)] * 100);
- self.BLOBUText = u''.join([unichr(i) for i in range(16834)])
- self.BLOBBinary = self.db_module.Binary(''.join([chr(i) for i in range(256)] * 16))
-
- leak_test = True
-
- def tearDown(self):
- if self.leak_test:
- import gc
- del self.cursor
- orphans = gc.collect()
- self.assertFalse(orphans, "%d orphaned objects found after deleting cursor" % orphans)
-
- del self.connection
- orphans = gc.collect()
- self.assertFalse(orphans, "%d orphaned objects found after deleting connection" % orphans)
-
- def table_exists(self, name):
- try:
- self.cursor.execute('select * from %s where 1=0' % name)
- except:
- return False
- else:
- return True
-
- def quote_identifier(self, ident):
- return '"%s"' % ident
-
- def new_table_name(self):
- i = id(self.cursor)
- while True:
- name = self.quote_identifier('tb%08x' % i)
- if not self.table_exists(name):
- return name
- i = i + 1
-
- def create_table(self, columndefs):
-
- """ Create a table using a list of column definitions given in
- columndefs.
-
- generator must be a function taking arguments (row_number,
- col_number) returning a suitable data object for insertion
- into the table.
-
- """
- self.table = self.new_table_name()
- self.cursor.execute('CREATE TABLE %s (%s) %s' %
- (self.table,
- ',\n'.join(columndefs),
- self.create_table_extra))
-
- def check_data_integrity(self, columndefs, generator):
- # insert
- self.create_table(columndefs)
- insert_statement = ('INSERT INTO %s VALUES (%s)' %
- (self.table,
- ','.join(['%s'] * len(columndefs))))
- data = [ [ generator(i,j) for j in range(len(columndefs)) ]
- for i in range(self.rows) ]
- if self.debug:
- print data
- self.cursor.executemany(insert_statement, data)
- self.connection.commit()
- # verify
- self.cursor.execute('select * from %s' % self.table)
- l = self.cursor.fetchall()
- if self.debug:
- print l
- self.assertEquals(len(l), self.rows)
- try:
- for i in range(self.rows):
- for j in range(len(columndefs)):
- self.assertEquals(l[i][j], generator(i,j))
- finally:
- if not self.debug:
- self.cursor.execute('drop table %s' % (self.table))
-
- def test_transactions(self):
- columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
- def generator(row, col):
- if col == 0: return row
- else: return ('%i' % (row%10))*255
- self.create_table(columndefs)
- insert_statement = ('INSERT INTO %s VALUES (%s)' %
- (self.table,
- ','.join(['%s'] * len(columndefs))))
- data = [ [ generator(i,j) for j in range(len(columndefs)) ]
- for i in range(self.rows) ]
- self.cursor.executemany(insert_statement, data)
- # verify
- self.connection.commit()
- self.cursor.execute('select * from %s' % self.table)
- l = self.cursor.fetchall()
- self.assertEquals(len(l), self.rows)
- for i in range(self.rows):
- for j in range(len(columndefs)):
- self.assertEquals(l[i][j], generator(i,j))
- delete_statement = 'delete from %s where col1=%%s' % self.table
- self.cursor.execute(delete_statement, (0,))
- self.cursor.execute('select col1 from %s where col1=%s' % \
- (self.table, 0))
- l = self.cursor.fetchall()
- self.assertFalse(l, "DELETE didn't work")
- self.connection.rollback()
- self.cursor.execute('select col1 from %s where col1=%s' % \
- (self.table, 0))
- l = self.cursor.fetchall()
- self.assertTrue(len(l) == 1, "ROLLBACK didn't work")
- self.cursor.execute('drop table %s' % (self.table))
-
- def test_truncation(self):
- columndefs = ( 'col1 INT', 'col2 VARCHAR(255)')
- def generator(row, col):
- if col == 0: return row
- else: return ('%i' % (row%10))*((255-self.rows/2)+row)
- self.create_table(columndefs)
- insert_statement = ('INSERT INTO %s VALUES (%s)' %
- (self.table,
- ','.join(['%s'] * len(columndefs))))
-
- try:
- self.cursor.execute(insert_statement, (0, '0'*256))
- except Warning:
- if self.debug: print self.cursor.messages
- except self.connection.DataError:
- pass
- else:
- self.fail("Over-long column did not generate warnings/exception with single insert")
-
- self.connection.rollback()
-
- try:
- for i in range(self.rows):
- data = []
- for j in range(len(columndefs)):
- data.append(generator(i,j))
- self.cursor.execute(insert_statement,tuple(data))
- except Warning:
- if self.debug: print self.cursor.messages
- except self.connection.DataError:
- pass
- else:
- self.fail("Over-long columns did not generate warnings/exception with execute()")
-
- self.connection.rollback()
-
- try:
- data = [ [ generator(i,j) for j in range(len(columndefs)) ]
- for i in range(self.rows) ]
- self.cursor.executemany(insert_statement, data)
- except Warning:
- if self.debug: print self.cursor.messages
- except self.connection.DataError:
- pass
- else:
- self.fail("Over-long columns did not generate warnings/exception with executemany()")
-
- self.connection.rollback()
- self.cursor.execute('drop table %s' % (self.table))
-
- def test_CHAR(self):
- # Character data
- def generator(row,col):
- return ('%i' % ((row+col) % 10)) * 255
- self.check_data_integrity(
- ('col1 char(255)','col2 char(255)'),
- generator)
-
- def test_INT(self):
- # Number data
- def generator(row,col):
- return row*row
- self.check_data_integrity(
- ('col1 INT',),
- generator)
-
- def test_DECIMAL(self):
- # DECIMAL
- def generator(row,col):
- from decimal import Decimal
- return Decimal("%d.%02d" % (row, col))
- self.check_data_integrity(
- ('col1 DECIMAL(5,2)',),
- generator)
-
- def test_DATE(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.DateFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 DATE',),
- generator)
-
- def test_TIME(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimeFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 TIME',),
- generator)
-
- def test_DATETIME(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 DATETIME',),
- generator)
-
- def test_TIMESTAMP(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313)
- self.check_data_integrity(
- ('col1 TIMESTAMP',),
- generator)
-
- def test_fractional_TIMESTAMP(self):
- ticks = time()
- def generator(row,col):
- return self.db_module.TimestampFromTicks(ticks+row*86400-col*1313+row*0.7*col/3.0)
- self.check_data_integrity(
- ('col1 TIMESTAMP',),
- generator)
-
- def test_LONG(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBUText # 'BLOB Text ' * 1024
- self.check_data_integrity(
- ('col1 INT', 'col2 LONG'),
- generator)
-
- def test_TEXT(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBUText[:5192] # 'BLOB Text ' * 1024
- self.check_data_integrity(
- ('col1 INT', 'col2 TEXT'),
- generator)
-
- def test_LONG_BYTE(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBBinary # 'BLOB\000Binary ' * 1024
- self.check_data_integrity(
- ('col1 INT','col2 LONG BYTE'),
- generator)
-
- def test_BLOB(self):
- def generator(row,col):
- if col == 0:
- return row
- else:
- return self.BLOBBinary # 'BLOB\000Binary ' * 1024
- self.check_data_integrity(
- ('col1 INT','col2 BLOB'),
- generator)
-