You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2020/07/19 18:20:11 UTC

[phoenix-queryserver] branch master updated: PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix-queryserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4b25c  PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch
4f4b25c is described below

commit 4f4b25c34d74544c7a7f3f43538586d7a81a221c
Author: Josh Elser <el...@apache.org>
AuthorDate: Thu Jul 9 21:03:10 2020 -0400

    PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch
    
    Closes #43
---
 python-phoenixdb/phoenixdb/avatica/client.py     | 33 ++++++++++++++++++++++++
 python-phoenixdb/phoenixdb/cursor.py             | 20 +++++++-------
 python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py |  2 +-
 3 files changed, 43 insertions(+), 12 deletions(-)

diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py
index 9b28e65..a1f502c 100644
--- a/python-phoenixdb/phoenixdb/avatica/client.py
+++ b/python-phoenixdb/phoenixdb/avatica/client.py
@@ -508,6 +508,39 @@ class AvaticaClient(object):
         response.ParseFromString(response_data)
         return response.results
 
+    def execute_batch(self, connection_id, statement_id, rows):
+        """Returns an array of update counts corresponding to each row written.
+
+        :param connection_id:
+            ID of the current connection.
+
+        :param statement_id:
+            ID of the statement to fetch rows from.
+
+        :param rows:
+            A list of lists corresponding to the columns to bind to the statement
+            for many rows.
+
+        :returns:
+            Update counts for the writes.
+        """
+        request = requests_pb2.ExecuteBatchRequest()
+        request.statement_id = statement_id
+        request.connection_id = connection_id
+        if rows is not None:
+            for row in rows:
+                batch = requests_pb2.UpdateBatch()
+                for col in row:
+                    batch.parameter_values.append(col)
+                request.updates.append(batch)
+
+        response_data = self._apply(request)
+        response = responses_pb2.ExecuteBatchResponse()
+        response.ParseFromString(response_data)
+        if response.missing_statement:
+            raise errors.DatabaseError('ExecuteBatch reported missing statement', -1)
+        return response.update_counts
+
     def fetch(self, connection_id, statement_id, offset=0, frame_max_size=None):
         """Returns a frame of rows.
 
diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py
index 5521ebc..ad09106 100644
--- a/python-phoenixdb/phoenixdb/cursor.py
+++ b/python-phoenixdb/phoenixdb/cursor.py
@@ -93,7 +93,7 @@ class Cursor(object):
         be automatically called at the end of the ``with`` block.
         """
         if self._closed:
-            raise ProgrammingError('the cursor is already closed')
+            raise ProgrammingError('The cursor is already closed.')
         if self._id is not None:
             self._connection._client.close_statement(self._connection._id, self._id)
             self._id = None
@@ -153,7 +153,7 @@ class Cursor(object):
             if frame.rows:
                 self._pos = 0
             elif not frame.done:
-                raise InternalError('got an empty frame, but the statement is not done yet')
+                raise InternalError('Got an empty frame, but the statement is not done yet.')
 
     def _fetch_next_frame(self):
         offset = self._frame.offset + len(self._frame.rows)
@@ -199,7 +199,7 @@ class Cursor(object):
                         typed_value.type = common_pb2.ARRAY
                         typed_value.component_type = rep
                     else:
-                        raise ProgrammingError('scalar value specified for array parameter')
+                        raise ProgrammingError('Scalar value specified for array parameter.')
                 else:
                     if mutate_to is not None:
                         value = mutate_to(value)
@@ -211,7 +211,7 @@ class Cursor(object):
 
     def execute(self, operation, parameters=None):
         if self._closed:
-            raise ProgrammingError('the cursor is already closed')
+            raise ProgrammingError('The cursor is already closed.')
         self._updatecount = -1
         self._set_frame(None)
         if parameters is None:
@@ -235,18 +235,16 @@ class Cursor(object):
 
     def executemany(self, operation, seq_of_parameters):
         if self._closed:
-            raise ProgrammingError('the cursor is already closed')
+            raise ProgrammingError('The cursor is already closed.')
         self._updatecount = -1
         self._set_frame(None)
         statement = self._connection._client.prepare(
             self._connection._id, operation, max_rows_total=0)
         self._set_id(statement.id)
         self._set_signature(statement.signature)
-        for parameters in seq_of_parameters:
-            self._connection._client.execute(
-                self._connection._id, self._id,
-                statement.signature, self._transform_parameters(parameters),
-                first_frame_max_size=0)
+        return self._connection._client.execute_batch(
+            self._connection._id, self._id,
+            [self._transform_parameters(p) for p in seq_of_parameters])
 
     def _transform_row(self, row):
         """Transforms a Row into Python values.
@@ -291,7 +289,7 @@ class Cursor(object):
 
     def fetchone(self):
         if self._frame is None:
-            raise ProgrammingError('no select statement was executed')
+            raise ProgrammingError('No select statement was executed.')
         if self._pos is None:
             return None
         rows = self._frame.rows
diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
index 61cb4b8..b402322 100644
--- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
+++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py
@@ -35,7 +35,7 @@ class PhoenixDDLCompiler(DDLCompiler):
 
     def visit_primary_key_constraint(self, constraint):
         if constraint.name is None:
-            raise CompileError("can't create primary key without a name")
+            raise CompileError("Can't create primary key without a name.")
         return DDLCompiler.visit_primary_key_constraint(self, constraint)