You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2020/07/19 18:20:11 UTC
[phoenix-queryserver] branch master updated: PHOENIX-5999 Hack
together a better implementation for executemany using ExecuteBatch
This is an automated email from the ASF dual-hosted git repository.
stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-queryserver.git
The following commit(s) were added to refs/heads/master by this push:
new 4f4b25c PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch
4f4b25c is described below
commit 4f4b25c34d74544c7a7f3f43538586d7a81a221c
Author: Josh Elser <el...@apache.org>
AuthorDate: Thu Jul 9 21:03:10 2020 -0400
PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch
Closes #43
---
python-phoenixdb/phoenixdb/avatica/client.py | 33 ++++++++++++++++++++++++
python-phoenixdb/phoenixdb/cursor.py | 20 +++++++-------
python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py | 2 +-
3 files changed, 43 insertions(+), 12 deletions(-)
diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py
index 9b28e65..a1f502c 100644
--- a/python-phoenixdb/phoenixdb/avatica/client.py
+++ b/python-phoenixdb/phoenixdb/avatica/client.py
@@ -508,6 +508,39 @@ class AvaticaClient(object):
response.ParseFromString(response_data)
return response.results
+ def execute_batch(self, connection_id, statement_id, rows):
+ """Returns an array of update counts corresponding to each row written.
+
+ :param connection_id:
+ ID of the current connection.
+
+ :param statement_id:
+ ID of the statement to fetch rows from.
+
+ :param rows:
+ A list of lists corresponding to the columns to bind to the statement
+ for many rows.
+
+ :returns:
+ Update counts for the writes.
+ """
+ request = requests_pb2.ExecuteBatchRequest()
+ request.statement_id = statement_id
+ request.connection_id = connection_id
+ if rows is not None:
+ for row in rows:
+ batch = requests_pb2.UpdateBatch()
+ for col in row:
+ batch.parameter_values.append(col)
+ request.updates.append(batch)
+
+ response_data = self._apply(request)
+ response = responses_pb2.ExecuteBatchResponse()
+ response.ParseFromString(response_data)
+ if response.missing_statement:
+ raise errors.DatabaseError('ExecuteBatch reported missing statement', -1)
+ return response.update_counts
+
def fetch(self, connection_id, statement_id, offset=0, frame_max_size=None):
"""Returns a frame of rows.
diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py
index 5521ebc..ad09106 100644
--- a/python-phoenixdb/phoenixdb/cursor.py
+++ b/python-phoenixdb/phoenixdb/cursor.py
@@ -93,7 +93,7 @@ class Cursor(object):
be automatically called at the end of the ``with`` block.
"""
if self._closed:
- raise ProgrammingError('the cursor is already closed')
+ raise ProgrammingError('The cursor is already closed.')
if self._id is not None:
self._connection._client.close_statement(self._connection._id, self._id)
self._id = None
@@ -153,7 +153,7 @@ class Cursor(object):
if frame.rows:
self._pos = 0
elif not frame.done:
- raise InternalError('got an empty frame, but the statement is not done yet')
+ raise InternalError('Got an empty frame, but the statement is not done yet.')
def _fetch_next_frame(self):
offset = self._frame.offset + len(self._frame.rows)
@@ -199,7 +199,7 @@ class Cursor(object):
typed_value.type = common_pb2.ARRAY
typed_value.component_type = rep
else:
- raise ProgrammingError('scalar value specified for array parameter')
+ raise ProgrammingError('Scalar value specified for array parameter.')
else:
if mutate_to is not None:
value = mutate_to(value)
@@ -211,7 +211,7 @@ class Cursor(object):
def execute(self, operation, parameters=None):
if self._closed:
- raise ProgrammingError('the cursor is already closed')
+ raise ProgrammingError('The cursor is already closed.')
self._updatecount = -1
self._set_frame(None)
if parameters is None:
@@ -235,18 +235,16 @@ class Cursor(object):
def executemany(self, operation, seq_of_parameters):
if self._closed:
- raise ProgrammingError('the cursor is already closed')
+ raise ProgrammingError('The cursor is already closed.')
self._updatecount = -1
self._set_frame(None)
statement = self._connection._client.prepare(
self._connection._id, operation, max_rows_total=0)
self._set_id(statement.id)
self._set_signature(statement.signature)
- for parameters in seq_of_parameters:
- self._connection._client.execute(
- self._connection._id, self._id,
- statement.signature, self._transform_parameters(parameters),
- first_frame_max_size=0)
+ return self._connection._client.execute_batch(
+ self._connection._id, self._id,
+ [self._transform_parameters(p) for p in seq_of_parameters])
def _transform_row(self, row):
"""Transforms a Row into Python values.
@@ -291,7 +289,7 @@ class Cursor(object):
def fetchone(self):
if self._frame is None:
- raise ProgrammingError('no select statement was executed')
+ raise ProgrammingError('No select statement was executed.')
if self._pos is None:
return None
rows = self._frame.rows
diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
index 61cb4b8..b402322 100644
--- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
+++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
@@ -35,7 +35,7 @@ class PhoenixDDLCompiler(DDLCompiler):
def visit_primary_key_constraint(self, constraint):
if constraint.name is None:
- raise CompileError("can't create primary key without a name")
+ raise CompileError("Can't create primary key without a name.")
return DDLCompiler.visit_primary_key_constraint(self, constraint)