You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/04/21 00:47:54 UTC
[arrow] branch master updated: ARROW-15777: [Python][Flight] Allow passing IpcReadOptions to FlightCallOptions
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b9952840be ARROW-15777: [Python][Flight] Allow passing IpcReadOptions to FlightCallOptions
b9952840be is described below
commit b9952840be6ff7234b416b5b80a48ecd7a5ecf60
Author: Raúl Cumplido <ra...@gmail.com>
AuthorDate: Wed Apr 20 20:47:39 2022 -0400
ARROW-15777: [Python][Flight] Allow passing IpcReadOptions to FlightCallOptions
The aim of this PR is to allow passing the newly added `pyarrow.lib.IpcReadOptions` to `FlightCallOptions`.
Closes #12939 from raulcd/ARROW-15777
Authored-by: Raúl Cumplido <ra...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
python/pyarrow/_flight.pyx | 11 ++++++++++-
python/pyarrow/includes/libarrow_flight.pxd | 1 +
python/pyarrow/tests/test_flight.py | 26 +++++++++++++++++++++++++-
3 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index c7cd19f7de..bd2f2e35a9 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -107,7 +107,8 @@ cdef class FlightCallOptions(_Weakrefable):
cdef:
CFlightCallOptions options
- def __init__(self, timeout=None, write_options=None, headers=None):
+ def __init__(self, timeout=None, write_options=None, headers=None,
+ IpcReadOptions read_options=None):
"""Create call options.
Parameters
@@ -120,14 +121,22 @@ cdef class FlightCallOptions(_Weakrefable):
by environment variables (see pyarrow.ipc).
headers : List[Tuple[str, str]], optional
A list of arbitrary headers as key, value tuples
+ read_options : pyarrow.ipc.IpcReadOptions, optional
+ Serialization options for reading IPC format.
"""
cdef IpcWriteOptions c_write_options
+ cdef IpcReadOptions c_read_options
if timeout is not None:
self.options.timeout = CTimeoutDuration(timeout)
if write_options is not None:
c_write_options = _get_options(write_options)
self.options.write_options = c_write_options.c_options
+ if read_options is not None:
+ if not isinstance(read_options, IpcReadOptions):
+ raise TypeError("expected IpcReadOptions, got {}"
+ .format(type(read_options)))
+ self.options.read_options = read_options.c_options
if headers is not None:
self.options.headers = headers
diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd
index 5c70b06112..3698292b5a 100644
--- a/python/pyarrow/includes/libarrow_flight.pxd
+++ b/python/pyarrow/includes/libarrow_flight.pxd
@@ -227,6 +227,7 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
CFlightCallOptions()
CTimeoutDuration timeout
CIpcWriteOptions write_options
+ CIpcReadOptions read_options
vector[pair[c_string, c_string]] headers
CStopToken stop_token
diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py
index 56a033df41..fbf478a7cd 100644
--- a/python/pyarrow/tests/test_flight.py
+++ b/python/pyarrow/tests/test_flight.py
@@ -32,7 +32,7 @@ import numpy as np
import pytest
import pyarrow as pa
-from pyarrow.lib import tobytes
+from pyarrow.lib import IpcReadOptions, tobytes
from pyarrow.util import find_free_port
from pyarrow.tests import util
@@ -119,6 +119,12 @@ def simple_dicts_table():
return pa.Table.from_arrays(data, names=['some_dicts'])
+def multiple_column_table():
+ return pa.Table.from_arrays([pa.array(['foo', 'bar', 'baz', 'qux']),
+ pa.array([1, 2, 3, 4])],
+ names=['a', 'b'])
+
+
class ConstantFlightServer(FlightServerBase):
"""A Flight server that always returns the same data.
@@ -134,6 +140,7 @@ class ConstantFlightServer(FlightServerBase):
self.table_factories = {
b'ints': simple_ints_table,
b'dicts': simple_dicts_table,
+ b'multi': multiple_column_table,
}
self.options = options
@@ -1164,6 +1171,23 @@ def test_timeout_passes():
client.do_get(flight.Ticket(b'ints'), options=options).read_all()
+def test_read_options():
+ """Make sure ReadOptions can be used."""
+ expected = pa.Table.from_arrays([pa.array([1, 2, 3, 4])], names=["b"])
+ with ConstantFlightServer() as server, \
+ FlightClient(('localhost', server.port)) as client:
+ options = flight.FlightCallOptions(
+ read_options=IpcReadOptions(included_fields=[1]))
+ response1 = client.do_get(flight.Ticket(
+ b'multi'), options=options).read_all()
+ response2 = client.do_get(flight.Ticket(b'multi')).read_all()
+
+ assert response2.num_columns == 2
+ assert response1.num_columns == 1
+ assert response1 == expected
+ assert response2 == multiple_column_table()
+
+
basic_auth_handler = HttpBasicServerAuthHandler(creds={
b"test": b"p4ssw0rd",
})