You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by R KB <ro...@gmail.com> on 2022/02/08 18:24:35 UTC

Is it possible to use python asyncio with Flight?

GRPC has pretty good AsyncIO support at this point, and since Flight is
essentially a wrapper around some GRPC types: why can't we just expose
something that generates FlightData grpc objects?

Re: Is it possible to use python asyncio with Flight?

Posted by David Li <li...@apache.org>.
Great! I should really get around to that Jira so we have some better docs about this.

-David

On Tue, Feb 8, 2022, at 17:23, R KB wrote:
> Thanks David,
> 
> Got it working :) 
> 
> On Tue, 8 Feb 2022 at 21:41, David Li <li...@apache.org> wrote:
>> __
>> IIRC header_length does not include those first 8 bytes so there's an offset needed in all slices here.
>> 
>> Also, the first FlightData is expected to contain the schema.
>> 
>> On Tue, Feb 8, 2022, at 16:32, R KB wrote:
>>> Yeah, I think getting into the voided warranty domain.
>>> 
>>> I've done this:
>>> 
>>> class FlightService(flight_grpc.FlightServiceServicer):
>>>     def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
>>>         tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
>>>         batches = tbl.to_batches()
>>>         buf = batches[0].serialize()
>>> 
>>>         buffer_ = buf.to_pybytes()
>>>         header_length = struct.unpack("<I", buffer_[4:8])[0]
>>> 
>>>         yield flight_proto.FlightData(
>>>             flight_descriptor=flight_proto.FlightDescriptor(),
>>>             data_header=buffer_[8:header_length],
>>>             data_body=buffer_[header_length+8:]
>>>         )
>>> 
>>> 
>>> 
>>> Which looks kinda right... but I'm getting: 
>>> 
>>> FlightInternalError: Server never sent a data message
>>> 
>>> When I try to request with the flight client.
>>>> 
>>> 
>>> 
>>> On Tue, 8 Feb 2022 at 20:58, David Li <li...@apache.org> wrote:
>>>> __
>>>> Ah, right. You'd first serialize the batch to a byte buffer, then read the flatbuffer length from the second 4 bytes (since the first 4 bytes are the IPC continuation token), slice the buffer, and use that as the message header, and the remainder of the buffer as the message body. (But this is starting to get into "your warranty is void"/"double-check with the Arrow specification" territory.)
>>>> 
>>>> -David
>>>> 
>>>> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>>>>> I'm confident I can do that with a RecordBatchWriter and BytesIO object that I can the value of., however what do I do about the message header?
>>>>> 
>>>>> On Tue, 8 Feb 2022 at 19:17, David Li <li...@apache.org> wrote:
>>>>>> __
>>>>>> No, you'd have to serialize the buffer, chop off the first 8 bytes yourself, and generate the Protobuf.
>>>>>> 
>>>>>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>>>>>> Thanks for that; hopefully those links should be fruitful.
>>>>>>> 
>>>>>>> Question, before I get started, is there an API exposed so that I could do the reverse of this: 
>>>>>>> 
>>>>>>> `import asyncio
>>>>>>> import pathlib
>>>>>>> import struct
>>>>>>> 
>>>>>>> import grpc
>>>>>>> import pyarrow as pa
>>>>>>> import pyarrow.flight as pf
>>>>>>> 
>>>>>>> import Flight_pb2, Flight_pb2_grpc
>>>>>>> 
>>>>>>> async def main():
>>>>>>>     ticket = pf.Ticket("tick")
>>>>>>>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>>>>>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>>>>>         schema = None
>>>>>>>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>>>>>             # 4 bytes: Need IPC continuation token
>>>>>>>             token = b'\xff\xff\xff\xff'
>>>>>>>             # 4 bytes: message length (little-endian)
>>>>>>>             length = struct.pack('<I', len(data.data_header))
>>>>>>>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>>>>>>>             message = pa.ipc.read_message(buf)
>>>>>>>             print(message)
>>>>>>>             if schema is None:
>>>>>>>                 # This should work but is unimplemented
>>>>>>>                 # print(pa.ipc.read_schema(message))
>>>>>>>                 schema = pa.ipc.read_schema(buf)
>>>>>>>                 print(schema)
>>>>>>>             else:
>>>>>>>                 batch = pa.ipc.read_record_batch(message, schema)
>>>>>>>                 print(batch)
>>>>>>>                 print(batch.to_pydict())
>>>>>>> 
>>>>>>> asyncio.run(main())`
>>>>>>> 
>>>>>>> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>>>>>>>> __
>>>>>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses gRPC/C++, which is mostly a separate library entirely from grpcio and does not benefit from any improvements there. (The two do share a common network stack, but that's all; also, grpcio doesn't expose any of the lower level APIs that might make it possible to combine the two somehow.)
>>>>>>>> 
>>>>>>>> You might ask why pyarrow.flight didn't use grpcio directly (with bindings to transmute from FlightData to RecordBatch). However at the time the thought is that we would also have non-gRPC transports (which are finally being worked on) and so a from-scratch grpcio/Python implementation was not desirable. 
>>>>>>>> 
>>>>>>>> That said there are issues filed about better documenting FlightData. See ARROW-15287[1] which links a StackOverflow answer that demonstrates how to glue together asyncio/grpcio/PyArrow.
>>>>>>>> 
>>>>>>>> There's also some previous discussion about adding async to Flight more generally [2].
>>>>>>>> 
>>>>>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>>>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>>>>>> 
>>>>>>>> -David
>>>>>>>> 
>>>>>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight is essentially a wrapper around some GRPC types: why can't we just expose something that generates FlightData grpc objects?
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>> 
>> 

Re: Is it possible to use python asyncio with Flight?

Posted by R KB <ro...@gmail.com>.
Thanks David,

Got it working :)

On Tue, 8 Feb 2022 at 21:41, David Li <li...@apache.org> wrote:

> IIRC header_length does not include those first 8 bytes so there's an
> offset needed in all slices here.
>
> Also, the first FlightData is expected to contain the schema.
>
> On Tue, Feb 8, 2022, at 16:32, R KB wrote:
>
> Yeah, I think getting into the voided warranty domain.
>
> I've done this:
>
> class FlightService(flight_grpc.FlightServiceServicer):
>     def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
>         tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
>         batches = tbl.to_batches()
>         buf = batches[0].serialize()
>
>         buffer_ = buf.to_pybytes()
>         header_length = struct.unpack("<I", buffer_[4:8])[0]
>
>         yield flight_proto.FlightData(
>             flight_descriptor=flight_proto.FlightDescriptor(),
>             data_header=buffer_[8:header_length],
>             data_body=buffer_[header_length+8:]
>         )
>
>
>
> Which looks kinda right... but I'm getting:
>
> FlightInternalError: Server never sent a data message
>
> When I try to request with the flight client.
>
>
>
>
> On Tue, 8 Feb 2022 at 20:58, David Li <li...@apache.org> wrote:
>
>
> Ah, right. You'd first serialize the batch to a byte buffer, then read the
> flatbuffer length from the second 4 bytes (since the first 4 bytes are the
> IPC continuation token), slice the buffer, and use that as the message
> header, and the remainder of the buffer as the message body. (But this is
> starting to get into "your warranty is void"/"double-check with the Arrow
> specification" territory.)
>
> -David
>
> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>
> I'm confident I can do that with a RecordBatchWriter and BytesIO object
> that I can the value of., however what do I do about the message header?
>
> On Tue, 8 Feb 2022 at 19:17, David Li <li...@apache.org> wrote:
>
>
> No, you'd have to serialize the buffer, chop off the first 8 bytes
> yourself, and generate the Protobuf.
>
> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>
> Thanks for that; hopefully those links should be fruitful.
>
> Question, before I get started, is there an API exposed so that I could do
> the reverse of this:
>
> import asyncioimport pathlibimport struct
> import grpcimport pyarrow as paimport pyarrow.flight as pf
> import Flight_pb2, Flight_pb2_grpc
> async def main():
>     ticket = pf.Ticket("tick")
>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>         schema = None
>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>             # 4 bytes: Need IPC continuation token
>             token = b'\xff\xff\xff\xff'
>             # 4 bytes: message length (little-endian)
>             length = struct.pack('<I', len(data.data_header))
>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>             message = pa.ipc.read_message(buf)
>             print(message)
>             if schema is None:
>                 # This should work but is unimplemented
>                 # print(pa.ipc.read_schema(message))
>                 schema = pa.ipc.read_schema(buf)
>                 print(schema)
>             else:
>                 batch = pa.ipc.read_record_batch(message, schema)
>                 print(batch)
>                 print(batch.to_pydict())
>
> asyncio.run(main())
>
>
> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>
>
> Unfortunately Flight wraps the C++ Flight implementation, which uses
> gRPC/C++, which is mostly a separate library entirely from grpcio and does
> not benefit from any improvements there. (The two do share a common network
> stack, but that's all; also, grpcio doesn't expose any of the lower level
> APIs that might make it possible to combine the two somehow.)
>
> You might ask why pyarrow.flight didn't use grpcio directly (with bindings
> to transmute from FlightData to RecordBatch). However at the time the
> thought is that we would also have non-gRPC transports (which are finally
> being worked on) and so a from-scratch grpcio/Python implementation was not
> desirable.
>
> That said there are issues filed about better documenting FlightData. See
> ARROW-15287[1] which links a StackOverflow answer that demonstrates how to
> glue together asyncio/grpcio/PyArrow.
>
> There's also some previous discussion about adding async to Flight more
> generally [2].
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15287
> [2]: "[C++] Async Arrow Flight" 2021/06/02
> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>
> -David
>
> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>
> GRPC has pretty good AsyncIO support at this point, and since Flight is
> essentially a wrapper around some GRPC types: why can't we just expose
> something that generates FlightData grpc objects?
>
>
>
>
>
>
>
>
>
>

Re: Is it possible to use python asyncio with Flight?

Posted by David Li <li...@apache.org>.
IIRC header_length does not include those first 8 bytes so there's an offset needed in all slices here.

Also, the first FlightData is expected to contain the schema.

On Tue, Feb 8, 2022, at 16:32, R KB wrote:
> Yeah, I think getting into the voided warranty domain.
> 
> I've done this:
> 
> class FlightService(flight_grpc.FlightServiceServicer):
>     def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
>         tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
>         batches = tbl.to_batches()
>         buf = batches[0].serialize()
> 
>         buffer_ = buf.to_pybytes()
>         header_length = struct.unpack("<I", buffer_[4:8])[0]
> 
>         yield flight_proto.FlightData(
>             flight_descriptor=flight_proto.FlightDescriptor(),
>             data_header=buffer_[8:header_length],
>             data_body=buffer_[header_length+8:]
>         )
> 
> 
> 
> Which looks kinda right... but I'm getting: 
> 
> FlightInternalError: Server never sent a data message
> 
> When I try to request with the flight client.
>> 
> 
> 
> On Tue, 8 Feb 2022 at 20:58, David Li <li...@apache.org> wrote:
>> __
>> Ah, right. You'd first serialize the batch to a byte buffer, then read the flatbuffer length from the second 4 bytes (since the first 4 bytes are the IPC continuation token), slice the buffer, and use that as the message header, and the remainder of the buffer as the message body. (But this is starting to get into "your warranty is void"/"double-check with the Arrow specification" territory.)
>> 
>> -David
>> 
>> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>>> I'm confident I can do that with a RecordBatchWriter and BytesIO object that I can the value of., however what do I do about the message header?
>>> 
>>> On Tue, 8 Feb 2022 at 19:17, David Li <li...@apache.org> wrote:
>>>> __
>>>> No, you'd have to serialize the buffer, chop off the first 8 bytes yourself, and generate the Protobuf.
>>>> 
>>>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>>>> Thanks for that; hopefully those links should be fruitful.
>>>>> 
>>>>> Question, before I get started, is there an API exposed so that I could do the reverse of this: 
>>>>> 
>>>>> `import asyncio
>>>>> import pathlib
>>>>> import struct
>>>>> 
>>>>> import grpc
>>>>> import pyarrow as pa
>>>>> import pyarrow.flight as pf
>>>>> 
>>>>> import Flight_pb2, Flight_pb2_grpc
>>>>> 
>>>>> async def main():
>>>>>     ticket = pf.Ticket("tick")
>>>>>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>>>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>>>         schema = None
>>>>>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>>>             # 4 bytes: Need IPC continuation token
>>>>>             token = b'\xff\xff\xff\xff'
>>>>>             # 4 bytes: message length (little-endian)
>>>>>             length = struct.pack('<I', len(data.data_header))
>>>>>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>>>>>             message = pa.ipc.read_message(buf)
>>>>>             print(message)
>>>>>             if schema is None:
>>>>>                 # This should work but is unimplemented
>>>>>                 # print(pa.ipc.read_schema(message))
>>>>>                 schema = pa.ipc.read_schema(buf)
>>>>>                 print(schema)
>>>>>             else:
>>>>>                 batch = pa.ipc.read_record_batch(message, schema)
>>>>>                 print(batch)
>>>>>                 print(batch.to_pydict())
>>>>> 
>>>>> asyncio.run(main())`
>>>>> 
>>>>> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>>>>>> __
>>>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses gRPC/C++, which is mostly a separate library entirely from grpcio and does not benefit from any improvements there. (The two do share a common network stack, but that's all; also, grpcio doesn't expose any of the lower level APIs that might make it possible to combine the two somehow.)
>>>>>> 
>>>>>> You might ask why pyarrow.flight didn't use grpcio directly (with bindings to transmute from FlightData to RecordBatch). However at the time the thought is that we would also have non-gRPC transports (which are finally being worked on) and so a from-scratch grpcio/Python implementation was not desirable. 
>>>>>> 
>>>>>> That said there are issues filed about better documenting FlightData. See ARROW-15287[1] which links a StackOverflow answer that demonstrates how to glue together asyncio/grpcio/PyArrow.
>>>>>> 
>>>>>> There's also some previous discussion about adding async to Flight more generally [2].
>>>>>> 
>>>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>>>> 
>>>>>> -David
>>>>>> 
>>>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight is essentially a wrapper around some GRPC types: why can't we just expose something that generates FlightData grpc objects?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> 

Re: Is it possible to use python asyncio with Flight?

Posted by R KB <ro...@gmail.com>.
Yeah, I think getting into the voided warranty domain.

I've done this:

class FlightService(flight_grpc.FlightServiceServicer):
    def DoGet(self, request, ctx) -> Iterator[flight_proto.FlightData]:
        tbl = pyarrow.Table.from_pydict({"hello_world": [1,2,3]})
        batches = tbl.to_batches()
        buf = batches[0].serialize()

        buffer_ = buf.to_pybytes()
        header_length = struct.unpack("<I", buffer_[4:8])[0]

        yield flight_proto.FlightData(
            flight_descriptor=flight_proto.FlightDescriptor(),
            data_header=buffer_[8:header_length],
            data_body=buffer_[header_length+8:]
        )



Which looks kinda right... but I'm getting:

FlightInternalError: Server never sent a data message

When I try to request with the flight client.




On Tue, 8 Feb 2022 at 20:58, David Li <li...@apache.org> wrote:

> Ah, right. You'd first serialize the batch to a byte buffer, then read the
> flatbuffer length from the second 4 bytes (since the first 4 bytes are the
> IPC continuation token), slice the buffer, and use that as the message
> header, and the remainder of the buffer as the message body. (But this is
> starting to get into "your warranty is void"/"double-check with the Arrow
> specification" territory.)
>
> -David
>
> On Tue, Feb 8, 2022, at 15:25, R KB wrote:
>
> I'm confident I can do that with a RecordBatchWriter and BytesIO object
> that I can the value of., however what do I do about the message header?
>
> On Tue, 8 Feb 2022 at 19:17, David Li <li...@apache.org> wrote:
>
>
> No, you'd have to serialize the buffer, chop off the first 8 bytes
> yourself, and generate the Protobuf.
>
> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>
> Thanks for that; hopefully those links should be fruitful.
>
> Question, before I get started, is there an API exposed so that I could do
> the reverse of this:
>
> import asyncioimport pathlibimport struct
> import grpcimport pyarrow as paimport pyarrow.flight as pf
> import Flight_pb2, Flight_pb2_grpc
> async def main():
>     ticket = pf.Ticket("tick")
>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>         schema = None
>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>             # 4 bytes: Need IPC continuation token
>             token = b'\xff\xff\xff\xff'
>             # 4 bytes: message length (little-endian)
>             length = struct.pack('<I', len(data.data_header))
>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>             message = pa.ipc.read_message(buf)
>             print(message)
>             if schema is None:
>                 # This should work but is unimplemented
>                 # print(pa.ipc.read_schema(message))
>                 schema = pa.ipc.read_schema(buf)
>                 print(schema)
>             else:
>                 batch = pa.ipc.read_record_batch(message, schema)
>                 print(batch)
>                 print(batch.to_pydict())
>
> asyncio.run(main())
>
>
> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>
>
> Unfortunately Flight wraps the C++ Flight implementation, which uses
> gRPC/C++, which is mostly a separate library entirely from grpcio and does
> not benefit from any improvements there. (The two do share a common network
> stack, but that's all; also, grpcio doesn't expose any of the lower level
> APIs that might make it possible to combine the two somehow.)
>
> You might ask why pyarrow.flight didn't use grpcio directly (with bindings
> to transmute from FlightData to RecordBatch). However at the time the
> thought is that we would also have non-gRPC transports (which are finally
> being worked on) and so a from-scratch grpcio/Python implementation was not
> desirable.
>
> That said there are issues filed about better documenting FlightData. See
> ARROW-15287[1] which links a StackOverflow answer that demonstrates how to
> glue together asyncio/grpcio/PyArrow.
>
> There's also some previous discussion about adding async to Flight more
> generally [2].
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15287
> [2]: "[C++] Async Arrow Flight" 2021/06/02
> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>
> -David
>
> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>
> GRPC has pretty good AsyncIO support at this point, and since Flight is
> essentially a wrapper around some GRPC types: why can't we just expose
> something that generates FlightData grpc objects?
>
>
>
>
>
>
>
>
>

Re: Is it possible to use python asyncio with Flight?

Posted by David Li <li...@apache.org>.
Ah, right. You'd first serialize the batch to a byte buffer, then read the flatbuffer length from the second 4 bytes (since the first 4 bytes are the IPC continuation token), slice the buffer, and use that as the message header, and the remainder of the buffer as the message body. (But this is starting to get into "your warranty is void"/"double-check with the Arrow specification" territory.)

-David

On Tue, Feb 8, 2022, at 15:25, R KB wrote:
> I'm confident I can do that with a RecordBatchWriter and BytesIO object that I can the value of., however what do I do about the message header?
> 
> On Tue, 8 Feb 2022 at 19:17, David Li <li...@apache.org> wrote:
>> __
>> No, you'd have to serialize the buffer, chop off the first 8 bytes yourself, and generate the Protobuf.
>> 
>> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>>> Thanks for that; hopefully those links should be fruitful.
>>> 
>>> Question, before I get started, is there an API exposed so that I could do the reverse of this: 
>>> 
>>> `import asyncio
>>> import pathlib
>>> import struct
>>> 
>>> import grpc
>>> import pyarrow as pa
>>> import pyarrow.flight as pf
>>> 
>>> import Flight_pb2, Flight_pb2_grpc
>>> 
>>> async def main():
>>>     ticket = pf.Ticket("tick")
>>>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>>>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>>>         schema = None
>>>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>>>             # 4 bytes: Need IPC continuation token
>>>             token = b'\xff\xff\xff\xff'
>>>             # 4 bytes: message length (little-endian)
>>>             length = struct.pack('<I', len(data.data_header))
>>>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>>>             message = pa.ipc.read_message(buf)
>>>             print(message)
>>>             if schema is None:
>>>                 # This should work but is unimplemented
>>>                 # print(pa.ipc.read_schema(message))
>>>                 schema = pa.ipc.read_schema(buf)
>>>                 print(schema)
>>>             else:
>>>                 batch = pa.ipc.read_record_batch(message, schema)
>>>                 print(batch)
>>>                 print(batch.to_pydict())
>>> 
>>> asyncio.run(main())`
>>> 
>>> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>>>> __
>>>> Unfortunately Flight wraps the C++ Flight implementation, which uses gRPC/C++, which is mostly a separate library entirely from grpcio and does not benefit from any improvements there. (The two do share a common network stack, but that's all; also, grpcio doesn't expose any of the lower level APIs that might make it possible to combine the two somehow.)
>>>> 
>>>> You might ask why pyarrow.flight didn't use grpcio directly (with bindings to transmute from FlightData to RecordBatch). However at the time the thought is that we would also have non-gRPC transports (which are finally being worked on) and so a from-scratch grpcio/Python implementation was not desirable. 
>>>> 
>>>> That said there are issues filed about better documenting FlightData. See ARROW-15287[1] which links a StackOverflow answer that demonstrates how to glue together asyncio/grpcio/PyArrow.
>>>> 
>>>> There's also some previous discussion about adding async to Flight more generally [2].
>>>> 
>>>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>>>> [2]: "[C++] Async Arrow Flight" 2021/06/02 https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>>>> 
>>>> -David
>>>> 
>>>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>>>> GRPC has pretty good AsyncIO support at this point, and since Flight is essentially a wrapper around some GRPC types: why can't we just expose something that generates FlightData grpc objects?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 

Re: Is it possible to use python asyncio with Flight?

Posted by R KB <ro...@gmail.com>.
I'm confident I can do that with a RecordBatchWriter and BytesIO object
that I can the value of., however what do I do about the message header?

On Tue, 8 Feb 2022 at 19:17, David Li <li...@apache.org> wrote:

> No, you'd have to serialize the buffer, chop off the first 8 bytes
> yourself, and generate the Protobuf.
>
> On Tue, Feb 8, 2022, at 13:59, R KB wrote:
>
> Thanks for that; hopefully those links should be fruitful.
>
> Question, before I get started, is there an API exposed so that I could do
> the reverse of this:
>
> import asyncioimport pathlibimport struct
> import grpcimport pyarrow as paimport pyarrow.flight as pf
> import Flight_pb2, Flight_pb2_grpc
> async def main():
>     ticket = pf.Ticket("tick")
>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>         schema = None
>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>             # 4 bytes: Need IPC continuation token
>             token = b'\xff\xff\xff\xff'
>             # 4 bytes: message length (little-endian)
>             length = struct.pack('<I', len(data.data_header))
>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>             message = pa.ipc.read_message(buf)
>             print(message)
>             if schema is None:
>                 # This should work but is unimplemented
>                 # print(pa.ipc.read_schema(message))
>                 schema = pa.ipc.read_schema(buf)
>                 print(schema)
>             else:
>                 batch = pa.ipc.read_record_batch(message, schema)
>                 print(batch)
>                 print(batch.to_pydict())
>
> asyncio.run(main())
>
>
> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>
>
> Unfortunately Flight wraps the C++ Flight implementation, which uses
> gRPC/C++, which is mostly a separate library entirely from grpcio and does
> not benefit from any improvements there. (The two do share a common network
> stack, but that's all; also, grpcio doesn't expose any of the lower level
> APIs that might make it possible to combine the two somehow.)
>
> You might ask why pyarrow.flight didn't use grpcio directly (with bindings
> to transmute from FlightData to RecordBatch). However at the time the
> thought is that we would also have non-gRPC transports (which are finally
> being worked on) and so a from-scratch grpcio/Python implementation was not
> desirable.
>
> That said there are issues filed about better documenting FlightData. See
> ARROW-15287[1] which links a StackOverflow answer that demonstrates how to
> glue together asyncio/grpcio/PyArrow.
>
> There's also some previous discussion about adding async to Flight more
> generally [2].
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15287
> [2]: "[C++] Async Arrow Flight" 2021/06/02
> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>
> -David
>
> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>
> GRPC has pretty good AsyncIO support at this point, and since Flight is
> essentially a wrapper around some GRPC types: why can't we just expose
> something that generates FlightData grpc objects?
>
>
>
>
>
>
>
>

Re: Is it possible to use python asyncio with Flight?

Posted by David Li <li...@apache.org>.
No, you'd have to serialize the buffer, chop off the first 8 bytes yourself, and generate the Protobuf.

On Tue, Feb 8, 2022, at 13:59, R KB wrote:
> Thanks for that; hopefully those links should be fruitful.
> 
> Question, before I get started, is there an API exposed so that I could do the reverse of this: 
> 
> `import asyncio
> import pathlib
> import struct
> 
> import grpc
> import pyarrow as pa
> import pyarrow.flight as pf
> 
> import Flight_pb2, Flight_pb2_grpc
> 
> async def main():
>     ticket = pf.Ticket("tick")
>     async with grpc.aio.insecure_channel("localhost:1234") as channel:
>         stub = Flight_pb2_grpc.FlightServiceStub(channel)
>         schema = None
>         async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
>             # 4 bytes: Need IPC continuation token
>             token = b'\xff\xff\xff\xff'
>             # 4 bytes: message length (little-endian)
>             length = struct.pack('<I', len(data.data_header))
>             buf = pa.py_buffer(token + length + data.data_header + data.data_body)
>             message = pa.ipc.read_message(buf)
>             print(message)
>             if schema is None:
>                 # This should work but is unimplemented
>                 # print(pa.ipc.read_schema(message))
>                 schema = pa.ipc.read_schema(buf)
>                 print(schema)
>             else:
>                 batch = pa.ipc.read_record_batch(message, schema)
>                 print(batch)
>                 print(batch.to_pydict())
> 
> asyncio.run(main())`
> 
> On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:
>> __
>> Unfortunately Flight wraps the C++ Flight implementation, which uses gRPC/C++, which is mostly a separate library entirely from grpcio and does not benefit from any improvements there. (The two do share a common network stack, but that's all; also, grpcio doesn't expose any of the lower level APIs that might make it possible to combine the two somehow.)
>> 
>> You might ask why pyarrow.flight didn't use grpcio directly (with bindings to transmute from FlightData to RecordBatch). However at the time the thought is that we would also have non-gRPC transports (which are finally being worked on) and so a from-scratch grpcio/Python implementation was not desirable. 
>> 
>> That said there are issues filed about better documenting FlightData. See ARROW-15287[1] which links a StackOverflow answer that demonstrates how to glue together asyncio/grpcio/PyArrow.
>> 
>> There's also some previous discussion about adding async to Flight more generally [2].
>> 
>> [1]: https://issues.apache.org/jira/browse/ARROW-15287
>> [2]: "[C++] Async Arrow Flight" 2021/06/02 https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>> 
>> -David
>> 
>> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>>> GRPC has pretty good AsyncIO support at this point, and since Flight is essentially a wrapper around some GRPC types: why can't we just expose something that generates FlightData grpc objects?
>>> 
>>> 
>>> 
>>> 
>>> 
>> 

Re: Is it possible to use python asyncio with Flight?

Posted by R KB <ro...@gmail.com>.
Thanks for that; hopefully those links should be fruitful.

Question, before I get started, is there an API exposed so that I could do
the reverse of this:

import asyncioimport pathlibimport struct
import grpcimport pyarrow as paimport pyarrow.flight as pf
import Flight_pb2, Flight_pb2_grpc
async def main():
    ticket = pf.Ticket("tick")
    async with grpc.aio.insecure_channel("localhost:1234") as channel:
        stub = Flight_pb2_grpc.FlightServiceStub(channel)
        schema = None
        async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
            # 4 bytes: Need IPC continuation token
            token = b'\xff\xff\xff\xff'
            # 4 bytes: message length (little-endian)
            length = struct.pack('<I', len(data.data_header))
            buf = pa.py_buffer(token + length + data.data_header +
data.data_body)
            message = pa.ipc.read_message(buf)
            print(message)
            if schema is None:
                # This should work but is unimplemented
                # print(pa.ipc.read_schema(message))
                schema = pa.ipc.read_schema(buf)
                print(schema)
            else:
                batch = pa.ipc.read_record_batch(message, schema)
                print(batch)
                print(batch.to_pydict())

asyncio.run(main())


On Tue, 8 Feb 2022 at 18:33, David Li <li...@apache.org> wrote:

> Unfortunately Flight wraps the C++ Flight implementation, which uses
> gRPC/C++, which is mostly a separate library entirely from grpcio and does
> not benefit from any improvements there. (The two do share a common network
> stack, but that's all; also, grpcio doesn't expose any of the lower level
> APIs that might make it possible to combine the two somehow.)
>
> You might ask why pyarrow.flight didn't use grpcio directly (with bindings
> to transmute from FlightData to RecordBatch). However at the time the
> thought is that we would also have non-gRPC transports (which are finally
> being worked on) and so a from-scratch grpcio/Python implementation was not
> desirable.
>
> That said there are issues filed about better documenting FlightData. See
> ARROW-15287[1] which links a StackOverflow answer that demonstrates how to
> glue together asyncio/grpcio/PyArrow.
>
> There's also some previous discussion about adding async to Flight more
> generally [2].
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15287
> [2]: "[C++] Async Arrow Flight" 2021/06/02
> https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv
>
> -David
>
> On Tue, Feb 8, 2022, at 13:24, R KB wrote:
>
> GRPC has pretty good AsyncIO support at this point, and since Flight is
> essentially a wrapper around some GRPC types: why can't we just expose
> something that generates FlightData grpc objects?
>
>
>
>
>
>
>

Re: Is it possible to use python asyncio with Flight?

Posted by David Li <li...@apache.org>.
Unfortunately Flight wraps the C++ Flight implementation, which uses gRPC/C++, which is mostly a separate library entirely from grpcio and does not benefit from any improvements there. (The two do share a common network stack, but that's all; also, grpcio doesn't expose any of the lower level APIs that might make it possible to combine the two somehow.)

You might ask why pyarrow.flight didn't use grpcio directly (with bindings to transmute from FlightData to RecordBatch). However at the time the thought is that we would also have non-gRPC transports (which are finally being worked on) and so a from-scratch grpcio/Python implementation was not desirable. 

That said there are issues filed about better documenting FlightData. See ARROW-15287[1] which links a StackOverflow answer that demonstrates how to glue together asyncio/grpcio/PyArrow.

There's also some previous discussion about adding async to Flight more generally [2].

[1]: https://issues.apache.org/jira/browse/ARROW-15287
[2]: "[C++] Async Arrow Flight" 2021/06/02 https://lists.apache.org/thread/jrj6yx53gyj0tr18pfdghtb8krp4gpfv

-David

On Tue, Feb 8, 2022, at 13:24, R KB wrote:
> GRPC has pretty good AsyncIO support at this point, and since Flight is essentially a wrapper around some GRPC types: why can't we just expose something that generates FlightData grpc objects?
> 
> 
> 
> 
>