You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/26 14:59:37 UTC

[arrow-adbc] branch main updated: [Python] Implement partitioned data interface (#80)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e3c0c10  [Python] Implement partitioned data interface (#80)
e3c0c10 is described below

commit e3c0c10a564ccdcc5b51d2a069ca1d139b06dff3
Author: David Li <li...@gmail.com>
AuthorDate: Fri Aug 26 10:59:32 2022 -0400

    [Python] Implement partitioned data interface (#80)
---
 c/vendor/nanoarrow/nanoarrow.c                     |  17 --
 c/vendor/nanoarrow/nanoarrow.h                     |   5 +-
 c/vendor/vendor_nanoarrow.sh                       |   8 +-
 .../adbc_driver_manager/_lib.pyx                   | 213 +++++++++++++++------
 .../adbc_driver_manager/dbapi.py                   |  44 ++++-
 python/adbc_driver_manager/poetry.lock             |  63 +++---
 python/adbc_driver_manager/pyproject.toml          |   6 +-
 7 files changed, 236 insertions(+), 120 deletions(-)

diff --git a/c/vendor/nanoarrow/nanoarrow.c b/c/vendor/nanoarrow/nanoarrow.c
index 693fbf7..35f7ac3 100644
--- a/c/vendor/nanoarrow/nanoarrow.c
+++ b/c/vendor/nanoarrow/nanoarrow.c
@@ -163,11 +163,6 @@ void* ArrowRealloc(void* ptr, int64_t size) { return realloc(ptr, size); }
 
 void ArrowFree(void* ptr) { free(ptr); }
 
-static uint8_t* ArrowBufferAllocatorMallocAllocate(struct ArrowBufferAllocator* allocator,
-                                                   int64_t size) {
-  return ArrowMalloc(size);
-}
-
 static uint8_t* ArrowBufferAllocatorMallocReallocate(
     struct ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t old_size,
     int64_t new_size) {
@@ -186,11 +181,6 @@ struct ArrowBufferAllocator ArrowBufferAllocatorDefault() {
   return ArrowBufferAllocatorMalloc;
 }
 
-static uint8_t* ArrowBufferAllocatorNeverAllocate(struct ArrowBufferAllocator* allocator,
-                                                  int64_t size) {
-  return NULL;
-}
-
 static uint8_t* ArrowBufferAllocatorNeverReallocate(
     struct ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t old_size,
     int64_t new_size) {
@@ -1362,7 +1352,6 @@ static ArrowErrorCode ArrowMetadataGetValueInternal(const char* metadata,
   struct ArrowStringView existing_value;
   ArrowMetadataReaderInit(&reader, metadata);
 
-  int64_t size = sizeof(int32_t);
   while (ArrowMetadataReaderRead(&reader, &existing_key, &existing_value) ==
          NANOARROW_OK) {
     int key_equal = key->n_bytes == existing_key.n_bytes &&
@@ -1835,9 +1824,6 @@ static ArrowErrorCode ArrowArrayReserveInternal(struct ArrowArray* array,
 
 ArrowErrorCode ArrowArrayReserve(struct ArrowArray* array,
                                  int64_t additional_size_elements) {
-  struct ArrowArrayPrivateData* private_data =
-      (struct ArrowArrayPrivateData*)array->private_data;
-
   struct ArrowArrayView array_view;
   NANOARROW_RETURN_NOT_OK(ArrowArrayViewInitFromArray(&array_view, array));
 
@@ -1902,9 +1888,6 @@ static ArrowErrorCode ArrowArrayCheckInternalBufferSizes(
 
 ArrowErrorCode ArrowArrayFinishBuilding(struct ArrowArray* array,
                                         struct ArrowError* error) {
-  struct ArrowArrayPrivateData* private_data =
-      (struct ArrowArrayPrivateData*)array->private_data;
-
   // Make sure the value we get with array->buffers[i] is set to the actual
   // pointer (which may have changed from the original due to reallocation)
   ArrowArrayFlushInternalPointers(array);
diff --git a/c/vendor/nanoarrow/nanoarrow.h b/c/vendor/nanoarrow/nanoarrow.h
index 9f8cd63..daf4d6d 100644
--- a/c/vendor/nanoarrow/nanoarrow.h
+++ b/c/vendor/nanoarrow/nanoarrow.h
@@ -362,7 +362,7 @@ struct ArrowArrayView {
 #ifndef NANOARROW_BUILD_ID_H_INCLUDED
 #define NANOARROW_BUILD_ID_H_INCLUDED
 
-#define NANOARROW_BUILD_ID "OFF"
+#define NANOARROW_BUILD_ID "gha6953efeb0657c72fd66af8f34be66e0a3ed0e26a"
 
 #endif
 // Licensed to the Apache Software Foundation (ASF) under one
@@ -1660,9 +1660,6 @@ static inline ArrowErrorCode ArrowArrayStartAppending(struct ArrowArray* array)
 }
 
 static inline ArrowErrorCode ArrowArrayShrinkToFit(struct ArrowArray* array) {
-  struct ArrowArrayPrivateData* private_data =
-      (struct ArrowArrayPrivateData*)array->private_data;
-
   for (int64_t i = 0; i < 3; i++) {
     struct ArrowBuffer* buffer = ArrowArrayBuffer(array, i);
     NANOARROW_RETURN_NOT_OK(ArrowBufferResize(buffer, buffer->size_bytes, 1));
diff --git a/c/vendor/vendor_nanoarrow.sh b/c/vendor/vendor_nanoarrow.sh
index c3a9943..bc50743 100755
--- a/c/vendor/vendor_nanoarrow.sh
+++ b/c/vendor/vendor_nanoarrow.sh
@@ -31,13 +31,9 @@ main() {
     rm -rf nanoarrow
     mkdir -p nanoarrow
     tar --strip-components 1 -C "$SCRATCH" -xf "$tarball"
-    mkdir "$SCRATCH/build"
-    pushd "$SCRATCH/build"
-    cmake .. -DNANOARROW_BUNDLE=ON
-    popd
 
-    cp "$SCRATCH/build/amalgamation/nanoarrow/nanoarrow.c" nanoarrow/
-    cp "$SCRATCH/build/amalgamation/nanoarrow/nanoarrow.h" nanoarrow/
+    cp "$SCRATCH/dist/nanoarrow.c" nanoarrow/
+    cp "$SCRATCH/dist/nanoarrow.h" nanoarrow/
 }
 
 main "$@"
diff --git a/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx b/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
index 45bb0c9..ab2da54 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
+++ b/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
@@ -24,6 +24,7 @@ import typing
 from typing import List, Tuple
 
 import cython
+from cpython.bytes cimport PyBytes_FromStringAndSize
 from libc.stdint cimport int32_t, int64_t, uint8_t, uint32_t, uintptr_t
 from libc.string cimport memset
 from libcpp.vector cimport vector as c_vector
@@ -74,10 +75,6 @@ cdef extern from "adbc.h" nogil:
     cdef int ADBC_OBJECT_DEPTH_TABLES
     cdef int ADBC_OBJECT_DEPTH_COLUMNS
 
-    cdef int ADBC_OUTPUT_TYPE_ARROW
-    cdef int ADBC_OUTPUT_TYPE_PARTITIONS
-    cdef int ADBC_OUTPUT_TYPE_UPDATE
-
     cdef uint32_t ADBC_INFO_VENDOR_NAME
     cdef uint32_t ADBC_INFO_VENDOR_VERSION
     cdef uint32_t ADBC_INFO_VENDOR_ARROW_VERSION
@@ -86,6 +83,7 @@ cdef extern from "adbc.h" nogil:
     cdef uint32_t ADBC_INFO_DRIVER_ARROW_VERSION
 
     ctypedef void (*CAdbcErrorRelease)(CAdbcError*)
+    ctypedef void (*CAdbcPartitionsRelease)(CAdbcPartitions*)
 
     cdef struct CAdbcError"AdbcError":
         char* message
@@ -102,6 +100,13 @@ cdef extern from "adbc.h" nogil:
     cdef struct CAdbcStatement"AdbcStatement":
         void* private_data
 
+    cdef struct CAdbcPartitions"AdbcPartitions":
+        size_t num_partitions
+        const uint8_t** partitions
+        const size_t* partition_lengths
+        void* private_data
+        CAdbcPartitionsRelease release
+
     CAdbcStatusCode AdbcDatabaseNew(CAdbcDatabase* database, CAdbcError* error)
     CAdbcStatusCode AdbcDatabaseSetOption(
         CAdbcDatabase* database,
@@ -175,6 +180,11 @@ cdef extern from "adbc.h" nogil:
         CAdbcStatement* statement,
         CArrowArrayStream*,
         CAdbcError* error)
+    CAdbcStatusCode AdbcStatementExecutePartitions(
+        CAdbcStatement* statement,
+        CArrowSchema* schema, CAdbcPartitions* partitions,
+        int64_t* rows_affected,
+        CAdbcError* error)
     CAdbcStatusCode AdbcStatementExecuteQuery(
         CAdbcStatement* statement,
         CArrowArrayStream* out, int64_t* rows_affected,
@@ -443,7 +453,8 @@ cdef class AdbcDatabase(_AdbcHandle):
         cdef const char* c_value
         memset(&self.database, 0, cython.sizeof(CAdbcDatabase))
 
-        status = AdbcDatabaseNew(&self.database, &c_error)
+        with nogil:
+            status = AdbcDatabaseNew(&self.database, &c_error)
         check_error(status, &c_error)
 
         for key, value in kwargs.items():
@@ -454,7 +465,8 @@ cdef class AdbcDatabase(_AdbcHandle):
             status = AdbcDatabaseSetOption(&self.database, c_key, c_value, &c_error)
             check_error(status, &c_error)
 
-        status = AdbcDatabaseInit(&self.database, &c_error)
+        with nogil:
+            status = AdbcDatabaseInit(&self.database, &c_error)
         check_error(status, &c_error)
 
     def close(self) -> None:
@@ -463,7 +475,9 @@ cdef class AdbcDatabase(_AdbcHandle):
             return
 
         cdef CAdbcError c_error = empty_error()
-        cdef CAdbcStatusCode status = AdbcDatabaseRelease(&self.database, &c_error)
+        cdef CAdbcStatusCode status
+        with nogil:
+            status = AdbcDatabaseRelease(&self.database, &c_error)
         check_error(status, &c_error)
 
 
@@ -494,7 +508,8 @@ cdef class AdbcConnection(_AdbcHandle):
         self.database = database
         memset(&self.connection, 0, cython.sizeof(CAdbcConnection))
 
-        status = AdbcConnectionNew(&self.connection, &c_error)
+        with nogil:
+            status = AdbcConnectionNew(&self.connection, &c_error)
         check_error(status, &c_error)
 
         for key, value in kwargs.items():
@@ -505,13 +520,17 @@ cdef class AdbcConnection(_AdbcHandle):
             status = AdbcConnectionSetOption(&self.connection, c_key, c_value, &c_error)
             check_error(status, &c_error)
 
-        status = AdbcConnectionInit(&self.connection, &database.database, &c_error)
+        with nogil:
+            status = AdbcConnectionInit(&self.connection, &database.database, &c_error)
         check_error(status, &c_error)
 
     def commit(self) -> None:
         """Commit the current transaction."""
         cdef CAdbcError c_error = empty_error()
-        check_error(AdbcConnectionCommit(&self.connection, &c_error), &c_error)
+        cdef CAdbcStatusCode status
+        with nogil:
+            status = AdbcConnectionCommit(&self.connection, &c_error)
+        check_error(status, &c_error)
 
     def get_info(self, info_codes=None) -> ArrowArrayStreamHandle:
         """
@@ -529,19 +548,21 @@ cdef class AdbcConnection(_AdbcHandle):
                 else:
                     c_info_codes.push_back(info_code.value)
 
-            status = AdbcConnectionGetInfo(
-                &self.connection,
-                c_info_codes.data(),
-                c_info_codes.size(),
-                &stream.stream,
-                &c_error)
+            with nogil:
+                status = AdbcConnectionGetInfo(
+                    &self.connection,
+                    c_info_codes.data(),
+                    c_info_codes.size(),
+                    &stream.stream,
+                    &c_error)
         else:
-            status = AdbcConnectionGetInfo(
-                &self.connection,
-                NULL,
-                0,
-                &stream.stream,
-                &c_error)
+            with nogil:
+                status = AdbcConnectionGetInfo(
+                    &self.connection,
+                    NULL,
+                    0,
+                    &stream.stream,
+                    &c_error)
 
         check_error(status, &c_error)
         return stream
@@ -554,6 +575,7 @@ cdef class AdbcConnection(_AdbcHandle):
         cdef CAdbcError c_error = empty_error()
         cdef CAdbcStatusCode status
         cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
+        cdef int c_depth = GetObjectsDepth(depth).value
 
         cdef char* c_catalog = NULL
         if catalog is not None:
@@ -575,16 +597,17 @@ cdef class AdbcConnection(_AdbcHandle):
             column_name = _to_bytes(column_name, "column_name")
             c_column_name = column_name
 
-        status = AdbcConnectionGetObjects(
-            &self.connection,
-            GetObjectsDepth(depth).value,
-            c_catalog,
-            c_db_schema,
-            c_table_name,
-            NULL,  # TODO: support table_types
-            c_column_name,
-            &stream.stream,
-            &c_error)
+        with nogil:
+            status = AdbcConnectionGetObjects(
+                &self.connection,
+                c_depth,
+                c_catalog,
+                c_db_schema,
+                c_table_name,
+                NULL,  # TODO: support table_types
+                c_column_name,
+                &stream.stream,
+                &c_error)
         check_error(status, &c_error)
 
         return stream
@@ -601,6 +624,8 @@ cdef class AdbcConnection(_AdbcHandle):
         cdef CAdbcError c_error = empty_error()
         cdef CAdbcStatusCode status
         cdef ArrowSchemaHandle handle = ArrowSchemaHandle()
+        table_name = _to_bytes(table_name, "table_name")
+        cdef char* c_table_name = table_name
 
         cdef char* c_catalog = NULL
         if catalog is not None:
@@ -612,15 +637,15 @@ cdef class AdbcConnection(_AdbcHandle):
             db_schema = _to_bytes(db_schema, "db_schema")
             c_db_schema = db_schema
 
-        status = AdbcConnectionGetTableSchema(
-            &self.connection,
-            c_catalog,
-            c_db_schema,
-            _to_bytes(table_name, "table_name"),
-            &handle.schema,
-            &c_error)
+        with nogil:
+            status = AdbcConnectionGetTableSchema(
+                &self.connection,
+                c_catalog,
+                c_db_schema,
+                c_table_name,
+                &handle.schema,
+                &c_error)
         check_error(status, &c_error)
-
         return handle
 
     def get_table_types(self) -> ArrowArrayStreamHandle:
@@ -631,10 +656,28 @@ cdef class AdbcConnection(_AdbcHandle):
         cdef CAdbcStatusCode status
         cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
 
-        status = AdbcConnectionGetTableTypes(
-            &self.connection, &stream.stream, &c_error)
+        with nogil:
+            status = AdbcConnectionGetTableTypes(
+                &self.connection, &stream.stream, &c_error)
         check_error(status, &c_error)
+        return stream
 
+    def read_partition(self, bytes partition not None) -> ArrowArrayStreamHandle:
+        """Fetch a single partition from execute_partitions."""
+        cdef CAdbcError c_error = empty_error()
+        cdef CAdbcStatusCode status
+        cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
+        cdef const uint8_t* data = <const uint8_t*> partition
+        cdef size_t length = len(partition)
+
+        with nogil:
+            status = AdbcConnectionReadPartition(
+                &self.connection,
+                data,
+                length,
+                &stream.stream,
+                &c_error)
+        check_error(status, &c_error)
         return stream
 
     def rollback(self) -> None:
@@ -649,11 +692,13 @@ cdef class AdbcConnection(_AdbcHandle):
             value = ADBC_OPTION_VALUE_ENABLED
         else:
             value = ADBC_OPTION_VALUE_DISABLED
-        status = AdbcConnectionSetOption(
-            &self.connection,
-            ADBC_CONNECTION_OPTION_AUTOCOMMIT,
-            value,
-            &c_error)
+
+        with nogil:
+            status = AdbcConnectionSetOption(
+                &self.connection,
+                ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                value,
+                &c_error)
         check_error(status, &c_error)
 
     def close(self) -> None:
@@ -662,7 +707,10 @@ cdef class AdbcConnection(_AdbcHandle):
             return
 
         cdef CAdbcError c_error = empty_error()
-        cdef CAdbcStatusCode status = AdbcConnectionRelease(&self.connection, &c_error)
+        cdef CAdbcStatusCode status
+
+        with nogil:
+            status = AdbcConnectionRelease(&self.connection, &c_error)
         check_error(status, &c_error)
 
 
@@ -685,7 +733,11 @@ cdef class AdbcStatement(_AdbcHandle):
         cdef CAdbcError c_error = empty_error()
         memset(&self.statement, 0, cython.sizeof(CAdbcStatement))
 
-        status = AdbcStatementNew(&connection.connection, &self.statement, &c_error)
+        with nogil:
+            status = AdbcStatementNew(
+                &connection.connection,
+                &self.statement,
+                &c_error)
         check_error(status, &c_error)
 
     def bind(self, data, schema) -> None:
@@ -716,7 +768,12 @@ cdef class AdbcStatement(_AdbcHandle):
             raise TypeError(f"schema must be int or ArrowSchemaHandle, "
                             f"not {type(schema)}")
 
-        status = AdbcStatementBind(&self.statement, c_array, c_schema, &c_error)
+        with nogil:
+            status = AdbcStatementBind(
+                &self.statement,
+                c_array,
+                c_schema,
+                &c_error)
         check_error(status, &c_error)
 
     def bind_stream(self, stream) -> None:
@@ -738,7 +795,11 @@ cdef class AdbcStatement(_AdbcHandle):
             raise TypeError(f"data must be int or ArrowArrayStreamHandle, "
                             f"not {type(stream)}")
 
-        status = AdbcStatementBindStream(&self.statement, c_stream, &c_error)
+        with nogil:
+            status = AdbcStatementBindStream(
+                &self.statement,
+                c_stream,
+                &c_error)
         check_error(status, &c_error)
 
     def close(self) -> None:
@@ -746,7 +807,9 @@ cdef class AdbcStatement(_AdbcHandle):
             return
 
         cdef CAdbcError c_error = empty_error()
-        cdef CAdbcStatusCode status = AdbcStatementRelease(&self.statement, &c_error)
+        cdef CAdbcStatusCode status
+        with nogil:
+            status = AdbcStatementRelease(&self.statement, &c_error)
         check_error(status, &c_error)
 
     def execute_query(self) -> Tuple[ArrowArrayStreamHandle, int]:
@@ -787,8 +850,29 @@ cdef class AdbcStatement(_AdbcHandle):
         int
             The number of rows if known, else -1.
         """
-        # TODO(lidavidm): apache/arrow-adbc#68
-        raise NotImplementedError
+        cdef CAdbcError c_error = empty_error()
+        cdef ArrowSchemaHandle schema = ArrowSchemaHandle()
+        cdef CAdbcPartitions c_partitions = CAdbcPartitions(
+            0, NULL, NULL, NULL, NULL)
+        cdef int64_t rows_affected = 0
+
+        with nogil:
+            status = AdbcStatementExecutePartitions(
+                &self.statement,
+                &schema.schema,
+                &c_partitions,
+                &rows_affected,
+                &c_error)
+        check_error(status, &c_error)
+
+        partitions = []
+        for i in range(c_partitions.num_partitions):
+            length = c_partitions.partition_lengths[i]
+            data = <const char*> c_partitions.partitions[i]
+            partitions.append(PyBytes_FromStringAndSize(data, length))
+        c_partitions.release(&c_partitions)
+
+        return (partitions, schema, rows_affected)
 
     def execute_update(self) -> int:
         """
@@ -812,7 +896,8 @@ cdef class AdbcStatement(_AdbcHandle):
     def prepare(self) -> None:
         """Turn this statement into a prepared statement."""
         cdef CAdbcError c_error = empty_error()
-        status = AdbcStatementPrepare(&self.statement, &c_error)
+        with nogil:
+            status = AdbcStatementPrepare(&self.statement, &c_error)
         check_error(status, &c_error)
 
     def set_options(self, **kwargs) -> None:
@@ -827,16 +912,22 @@ cdef class AdbcStatement(_AdbcHandle):
                 &self.statement, c_key, c_value, &c_error)
             check_error(status, &c_error)
 
-    def set_sql_query(self, query: str) -> None:
+    def set_sql_query(self, str query not None) -> None:
         """Set a SQL query to be executed."""
         cdef CAdbcError c_error = empty_error()
-        status = AdbcStatementSetSqlQuery(
-            &self.statement, query.encode("utf-8"), &c_error)
+        cdef bytes query_data = query.encode("utf-8")
+        cdef const char* c_query = query_data
+        with nogil:
+            status = AdbcStatementSetSqlQuery(
+                &self.statement, c_query, &c_error)
         check_error(status, &c_error)
 
-    def set_substrait_plan(self, plan: bytes) -> None:
+    def set_substrait_plan(self, bytes plan not None) -> None:
         """Set a Substrait plan to be executed."""
         cdef CAdbcError c_error = empty_error()
-        status = AdbcStatementSetSubstraitPlan(
-            &self.statement, plan, len(plan), &c_error)
+        cdef const uint8_t* c_plan = <const uint8_t*> plan
+        cdef size_t length = len(plan)
+        with nogil:
+            status = AdbcStatementSetSubstraitPlan(
+                &self.statement, c_plan, length, &c_error)
         check_error(status, &c_error)
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index 396a158..33c0b68 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -24,9 +24,12 @@ import functools
 import time
 import typing
 import warnings
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
 
-import pyarrow
+try:
+    import pyarrow
+except ImportError as e:
+    raise ImportError("PyArrow is required for the DBAPI-compatible interface") from e
 
 from . import _lib
 
@@ -291,8 +294,7 @@ class Cursor(_Closeable):
             self._results.close()
         self._stmt.close()
 
-    def execute(self, operation, parameters=None) -> None:
-        """Execute a query."""
+    def _prepare_execute(self, operation, parameters=None) -> None:
         self._results = None
         if operation != self._last_query:
             self._last_query = operation
@@ -314,6 +316,9 @@ class Cursor(_Closeable):
             rb._export_to_c(arr_handle.address, sch_handle.address)
             self._stmt.bind(arr_handle, sch_handle)
 
+    def execute(self, operation, parameters=None) -> None:
+        """Execute a query."""
+        self._prepare_execute(operation, parameters)
         handle, self._rowcount = self._stmt.execute_query()
         self._results = _RowIterator(
             pyarrow.RecordBatchReader._import_from_c(handle.address)
@@ -343,6 +348,37 @@ class Cursor(_Closeable):
         self._stmt.bind(arr_handle, sch_handle)
         self._rowcount = self._stmt.execute_update()
 
+    def execute_partitions(
+        self, operation, parameters=None
+    ) -> Tuple[List[bytes], pyarrow.Schema]:
+        """
+        Execute a query and get the partitions of a distributed result set.
+
+        This is an extension method, not present in DBAPI.
+
+        Return
+        ------
+        partitions : list of byte
+            A list of partition descriptors, which can be read with
+            read_partition.
+        schema : pyarrow.Schema
+            The schema of the result set.
+        """
+        self._prepare_execute(operation, parameters)
+        partitions, schema, self._rowcount = self._stmt.execute_partitions()
+        return partitions, pyarrow.Schema._import_from_c(schema.address())
+
+    def read_partition(self, partition: bytes) -> None:
+        """
+        Read a partition of a distributed result set.
+        """
+        self._results = None
+        handle = self.conn._conn.read_partition(partition)
+        self._rowcount = -1
+        self._results = _RowIterator(
+            pyarrow.RecordBatchReader._import_from_c(handle.address)
+        )
+
     def fetchone(self) -> tuple:
         """Fetch one row of the result."""
         if self._results is None:
diff --git a/python/adbc_driver_manager/poetry.lock b/python/adbc_driver_manager/poetry.lock
index b1b6e23..a9fe0d4 100644
--- a/python/adbc_driver_manager/poetry.lock
+++ b/python/adbc_driver_manager/poetry.lock
@@ -46,7 +46,7 @@ python-versions = "*"
 
 [[package]]
 name = "numpy"
-version = "1.23.1"
+version = "1.23.2"
 description = "NumPy is the fundamental package for array computing with Python."
 category = "main"
 optional = false
@@ -160,7 +160,7 @@ six = ">=1.5"
 
 [[package]]
 name = "pytz"
-version = "2022.1"
+version = "2022.2.1"
 description = "World timezone definitions, modern and historical"
 category = "main"
 optional = false
@@ -182,10 +182,13 @@ category = "dev"
 optional = false
 python-versions = ">=3.7"
 
+[extras]
+dbapi = ["pyarrow"]
+
 [metadata]
 lock-version = "1.1"
 python-versions = ">=3.8"
-content-hash = "6929ad1880dbb184988a7019b239c2f93bd6f20cd186a12ae64c96335bc945e7"
+content-hash = "2e6618ad5d2fd1f6893201d2243d43007e4e5c26727adf2b1cba7aa59f07945a"
 
 [metadata.files]
 atomicwrites = [
@@ -246,28 +249,34 @@ iniconfig = [
     {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"},
 ]
 numpy = [
-    {file = "numpy-1.23.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b15c3f1ed08df4980e02cc79ee058b788a3d0bef2fb3c9ca90bb8cbd5b8a3a04"},
-    {file = "numpy-1.23.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9ce242162015b7e88092dccd0e854548c0926b75c7924a3495e02c6067aba1f5"},
-    {file = "numpy-1.23.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e0d7447679ae9a7124385ccf0ea990bb85bb869cef217e2ea6c844b6a6855073"},
-    {file = "numpy-1.23.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3119daed207e9410eaf57dcf9591fdc68045f60483d94956bee0bfdcba790953"},
-    {file = "numpy-1.23.1-cp310-cp310-win32.whl", hash = "sha256:3ab67966c8d45d55a2bdf40701536af6443763907086c0a6d1232688e27e5447"},
-    {file = "numpy-1.23.1-cp310-cp310-win_amd64.whl", hash = "sha256:1865fdf51446839ca3fffaab172461f2b781163f6f395f1aed256b1ddc253622"},
-    {file = "numpy-1.23.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:aeba539285dcf0a1ba755945865ec61240ede5432df41d6e29fab305f4384db2"},
-    {file = "numpy-1.23.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7e8229f3687cdadba2c4faef39204feb51ef7c1a9b669247d49a24f3e2e1617c"},
-    {file = "numpy-1.23.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:68b69f52e6545af010b76516f5daaef6173e73353e3295c5cb9f96c35d755641"},
-    {file = "numpy-1.23.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1408c3527a74a0209c781ac82bde2182b0f0bf54dea6e6a363fe0cc4488a7ce7"},
-    {file = "numpy-1.23.1-cp38-cp38-win32.whl", hash = "sha256:47f10ab202fe4d8495ff484b5561c65dd59177949ca07975663f4494f7269e3e"},
-    {file = "numpy-1.23.1-cp38-cp38-win_amd64.whl", hash = "sha256:37e5ebebb0eb54c5b4a9b04e6f3018e16b8ef257d26c8945925ba8105008e645"},
-    {file = "numpy-1.23.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:173f28921b15d341afadf6c3898a34f20a0569e4ad5435297ba262ee8941e77b"},
-    {file = "numpy-1.23.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:876f60de09734fbcb4e27a97c9a286b51284df1326b1ac5f1bf0ad3678236b22"},
-    {file = "numpy-1.23.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:35590b9c33c0f1c9732b3231bb6a72d1e4f77872390c47d50a615686ae7ed3fd"},
-    {file = "numpy-1.23.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a35c4e64dfca659fe4d0f1421fc0f05b8ed1ca8c46fb73d9e5a7f175f85696bb"},
-    {file = "numpy-1.23.1-cp39-cp39-win32.whl", hash = "sha256:c2f91f88230042a130ceb1b496932aa717dcbd665350beb821534c5c7e15881c"},
-    {file = "numpy-1.23.1-cp39-cp39-win_amd64.whl", hash = "sha256:37ece2bd095e9781a7156852e43d18044fd0d742934833335599c583618181b9"},
-    {file = "numpy-1.23.1-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:8002574a6b46ac3b5739a003b5233376aeac5163e5dcd43dd7ad062f3e186129"},
-    {file = "numpy-1.23.1-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d732d17b8a9061540a10fda5bfeabca5785700ab5469a5e9b93aca5e2d3a5fb"},
-    {file = "numpy-1.23.1-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:55df0f7483b822855af67e38fb3a526e787adf189383b4934305565d71c4b148"},
-    {file = "numpy-1.23.1.tar.gz", hash = "sha256:d748ef349bfef2e1194b59da37ed5a29c19ea8d7e6342019921ba2ba4fd8b624"},
+    {file = "numpy-1.23.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e603ca1fb47b913942f3e660a15e55a9ebca906857edfea476ae5f0fe9b457d5"},
+    {file = "numpy-1.23.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:633679a472934b1c20a12ed0c9a6c9eb167fbb4cb89031939bfd03dd9dbc62b8"},
+    {file = "numpy-1.23.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:17e5226674f6ea79e14e3b91bfbc153fdf3ac13f5cc54ee7bc8fdbe820a32da0"},
+    {file = "numpy-1.23.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bdc02c0235b261925102b1bd586579b7158e9d0d07ecb61148a1799214a4afd5"},
+    {file = "numpy-1.23.2-cp310-cp310-win32.whl", hash = "sha256:df28dda02c9328e122661f399f7655cdcbcf22ea42daa3650a26bce08a187450"},
+    {file = "numpy-1.23.2-cp310-cp310-win_amd64.whl", hash = "sha256:8ebf7e194b89bc66b78475bd3624d92980fca4e5bb86dda08d677d786fefc414"},
+    {file = "numpy-1.23.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:dc76bca1ca98f4b122114435f83f1fcf3c0fe48e4e6f660e07996abf2f53903c"},
+    {file = "numpy-1.23.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ecfdd68d334a6b97472ed032b5b37a30d8217c097acfff15e8452c710e775524"},
+    {file = "numpy-1.23.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5593f67e66dea4e237f5af998d31a43e447786b2154ba1ad833676c788f37cde"},
+    {file = "numpy-1.23.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ac987b35df8c2a2eab495ee206658117e9ce867acf3ccb376a19e83070e69418"},
+    {file = "numpy-1.23.2-cp311-cp311-win32.whl", hash = "sha256:d98addfd3c8728ee8b2c49126f3c44c703e2b005d4a95998e2167af176a9e722"},
+    {file = "numpy-1.23.2-cp311-cp311-win_amd64.whl", hash = "sha256:8ecb818231afe5f0f568c81f12ce50f2b828ff2b27487520d85eb44c71313b9e"},
+    {file = "numpy-1.23.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:909c56c4d4341ec8315291a105169d8aae732cfb4c250fbc375a1efb7a844f8f"},
+    {file = "numpy-1.23.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8247f01c4721479e482cc2f9f7d973f3f47810cbc8c65e38fd1bbd3141cc9842"},
+    {file = "numpy-1.23.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b8b97a8a87cadcd3f94659b4ef6ec056261fa1e1c3317f4193ac231d4df70215"},
+    {file = "numpy-1.23.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd5b7ccae24e3d8501ee5563e82febc1771e73bd268eef82a1e8d2b4d556ae66"},
+    {file = "numpy-1.23.2-cp38-cp38-win32.whl", hash = "sha256:9b83d48e464f393d46e8dd8171687394d39bc5abfe2978896b77dc2604e8635d"},
+    {file = "numpy-1.23.2-cp38-cp38-win_amd64.whl", hash = "sha256:dec198619b7dbd6db58603cd256e092bcadef22a796f778bf87f8592b468441d"},
+    {file = "numpy-1.23.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:4f41f5bf20d9a521f8cab3a34557cd77b6f205ab2116651f12959714494268b0"},
+    {file = "numpy-1.23.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:806cc25d5c43e240db709875e947076b2826f47c2c340a5a2f36da5bb10c58d6"},
+    {file = "numpy-1.23.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f9d84a24889ebb4c641a9b99e54adb8cab50972f0166a3abc14c3b93163f074"},
+    {file = "numpy-1.23.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c403c81bb8ffb1c993d0165a11493fd4bf1353d258f6997b3ee288b0a48fce77"},
+    {file = "numpy-1.23.2-cp39-cp39-win32.whl", hash = "sha256:cf8c6aed12a935abf2e290860af8e77b26a042eb7f2582ff83dc7ed5f963340c"},
+    {file = "numpy-1.23.2-cp39-cp39-win_amd64.whl", hash = "sha256:5e28cd64624dc2354a349152599e55308eb6ca95a13ce6a7d5679ebff2962913"},
+    {file = "numpy-1.23.2-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:806970e69106556d1dd200e26647e9bee5e2b3f1814f9da104a943e8d548ca38"},
+    {file = "numpy-1.23.2-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2bd879d3ca4b6f39b7770829f73278b7c5e248c91d538aab1e506c628353e47f"},
+    {file = "numpy-1.23.2-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:be6b350dfbc7f708d9d853663772a9310783ea58f6035eec649fb9c4371b5389"},
+    {file = "numpy-1.23.2.tar.gz", hash = "sha256:b78d00e48261fbbd04aa0d7427cf78d18401ee0abd89c7559bbf422e5b1c7d01"},
 ]
 packaging = [
     {file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"},
@@ -318,8 +327,8 @@ python-dateutil = [
     {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"},
 ]
 pytz = [
-    {file = "pytz-2022.1-py2.py3-none-any.whl", hash = "sha256:e68985985296d9a66a881eb3193b0906246245294a881e7c8afe623866ac6a5c"},
-    {file = "pytz-2022.1.tar.gz", hash = "sha256:1e760e2fe6a8163bc0b3d9a19c4f84342afa0a2affebfaa84b01b978a02ecaa7"},
+    {file = "pytz-2022.2.1-py2.py3-none-any.whl", hash = "sha256:220f481bdafa09c3955dfbdddb7b57780e9a94f5127e35456a48589b9e0c0197"},
+    {file = "pytz-2022.2.1.tar.gz", hash = "sha256:cea221417204f2d1a2aa03ddae3e867921971d0d76f14d87abb4414415bbdcf5"},
 ]
 six = [
     {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
diff --git a/python/adbc_driver_manager/pyproject.toml b/python/adbc_driver_manager/pyproject.toml
index 116a925..618025b 100644
--- a/python/adbc_driver_manager/pyproject.toml
+++ b/python/adbc_driver_manager/pyproject.toml
@@ -29,15 +29,19 @@ script = "build.py"
 
 [tool.poetry.dependencies]
 pandas = { version = ">=1.2,<2", optional = true }
-pyarrow = ">=8.0.0"
+pyarrow = { version = ">=8.0.0", optional = true }
 python = ">=3.8"
 
 [tool.poetry.dev-dependencies]
 Cython = "^0.29.32"
 pandas = ">=1.2"
+pyarrow = ">=8.0.0"
 pytest = "^7.1.2"
 setuptools = "^63.4.0"
 
+[tool.poetry.extras]
+dbapi = ["pyarrow"]
+
 [build-system]
 requires = ["Cython", "poetry-core>=1.0.0", "setuptools"]
 build-backend = "poetry.core.masonry.api"