You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by "Alex McRae (CW)" <al...@dremio.com> on 2022/02/14 17:27:29 UTC

Correct usage of listener.start() when root is shared with FlightStream

Hi team,

We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.

The code looks similar to this

public void getStream(CallContext context, ServerStreamListener listener) {
    FlightStream stream = this.client.getStream(ticket);
    while (flightStream.next()) {
        if (!flightStream.hasRoot()) { break; }
        
        listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
        listener.putNext();
    }
    listener.completed();
}

We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.

For more context we have also tried
public void getStream(CallContext context, ServerStreamListener listener) {
    FlightStream flightStream = this.client.getStream(ticket);
    listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
    while (flightStream.next()) {
        if (!flightStream.hasRoot()) { break; }
        
        listener.putNext();
    }
    listener.completed();
}
But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
https://github.com/dremio-hub/arrow-flight-client-examples <https://github.com/dremio-hub/arrow-flight-client-examples> is the client we are using to test this end to end.

Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.

Sincerely,
Alex McRae
alex.mcrae@dremio.com <ma...@dremio.com>

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by Jacques Nadeau <ja...@apache.org>.
There are definitely some very important rules around the use of threads
when using the async grpc/netty stuff. I don't remember them offhand. If I
recall, the default behavior for netty is all work associated with a
connection is done on the same thread. I think the code is called
HashedWheelTimer.

Note, this all goes back to when I wrote the original code for backpressure
and the associated tests so memory has definitely faded.

On Fri, Mar 11, 2022, 6:20 PM James Duong <ja...@bitquilltech.com> wrote:

> This was patching the application code (the FlightProducer implementation).
>
> I wouldn't say this happened under heavy load, but would consistently
> happen with queries that returned enough data in one chunk that a client
> would need to report being busy on.
>
> The behavior you're describing seems to indicate that the handlers should
> run on any thread available to gRPC, but my observation was that the
> handler would run on thread that getStream was called on. I haven't looked
> through to see if that is the case though.
>
> On Fri., Mar. 11, 2022, 17:47 David Li, <li...@apache.org> wrote:
>
>> Hmm, you mean patching Flight itself? Or the application code? (Sounds
>> like the latter?)
>>
>> Just curious - were you seeing this hanging only under load, and with a
>> fixed thread pool configured on the gRPC server? There is a pitfall there
>> due to how gRPC is implemented (certain internal callbacks, including the
>> one that sets the isCancelled flag IIRC, are run on the same thread pool as
>> RPC handlers, so if 1) your RPC handlers are synchronous, 2) your thread
>> pool has fixed capacity, and 3) all threads are servicing a call, then they
>> can get stuck forever because the gRPC internal callbacks can never run).
>>
>> On Fri, Mar 11, 2022, at 20:11, James Duong wrote:
>>
>> Following up on this, we were able to get by the hanging issue by
>> changing the FlightProducer getStream() implementation to send its work
>> (including the blocking on CallbackBackpressureStrategy) to a background
>> thread instead of the gRPC thread running it.
>>
>> On Tue, Mar 8, 2022 at 4:26 PM James Duong <ja...@bitquilltech.com>
>> wrote:
>>
>> We see backpressure related timeouts. I'm thinking there's an issue with
>> CallbackBackpressureStrategy relying on ServerStreamListener#isReady().
>> I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.
>>
>> We're going to try a fix for this locally then if it helps create a PR.
>>
>> On Mon, Mar 7, 2022 at 4:19 PM David Li <li...@apache.org> wrote:
>>
>>
>> So you're finding that if you remove the backpressure handler, there are
>> no problems?
>>
>> Is the timeout a gRPC timeout? Do you know if any messages are making it
>> through, or is it timing out after a period of no activity at all?
>>
>> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
>>
>> Hi David,
>>
>> We believe we have a data race somewhere as debugging the code above
>> causes no issues but running it without the debugger causes a timeout. We
>> were trying to investigate if putNext kept around a reference to the data
>> in the VectorSchemaRoot. Given that we are getting a timeout with the
>> backpressure we think it is possible the code
>> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
>> may be the culprit.
>>
>> On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:
>>
>>
>> It should be safe. Are you seeing any issues?
>>
>> Flight waits for an explicit next()/putNext() to actually touch anything.
>> And once they return, Flight will not read or mutate your data. So for
>> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
>> which you can reuse the Arrow buffers. (This is *not* true if you have
>> zero-copy writes enabled. In that case we poke a reference to the Arrow
>> buffers into the gRPC data structure and so mutating your Arrow buffer will
>> mysteriously change "previously written" data.)
>>
>> It's been years since I've touched this, though, so the details here are
>> fuzzy to me...
>>
>> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>>
>> Hi all,
>>
>> Related to this issue and solution below we were wondering if it is safe
>> to call VectorLoader.load() before checking if a client is ready when using
>> back pressure strategy. The thinking is that the client may still be
>> reading data from the root and calling load() may cause a data race.
>>
>> *public* void getStream(CallContext context, ServerStreamListener listener) {
>>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>>
>>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>>
>>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>>       listener.start(root, flightStream.getDictionaryProvider());
>>
>>       *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) {
>>           *break*;
>>         }
>>
>>         *if* (flightStream.getRoot() != clientRoot) {
>>           clientRoot = flightStream.getRoot();
>>           vectorUnloader = *new* VectorUnloader(clientRoot);
>>         }
>>
>>         // is this safe to happen before the client is ready?
>>         vectorLoader.load(vectorUnloader.getRecordBatch());
>>
>>         // this uses the build in CallBackpressureStrategy
>>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>>           listener.putNext();
>>         }
>>       }
>>
>>       listener.completed();}
>>
>> Let me know what you think.
>>
>> Sincerely,
>>
>> Alex McRae
>>
>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com>
>> wrote:
>>
>> Absolutely!
>>
>> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>>
>>
>> Just to make sure this doesn't get forgotten: I filed
>> https://github.com/apache/arrow-cookbook/issues/158 for providing an
>> example of this.
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>
>> It should be safe. The fast-read path has pretty much always been enabled
>> and I'm not aware of issues it causes. (The fast-read path simply calls an
>> internal gRPC method to avoid bouncing through byte[], but we're still
>> copying the data into Arrow, now that I look at it.)
>>
>> The fast-write path is not relevant here, but that's the one that is
>> trickier to use. We should make sure the optimization(s) are properly
>> documented, since looking through it's not really explained what the
>> consequences are (or at least the flag in ArrowMessage should reference
>> setUseZeroCopy, and we should have a doc page for these env vars analogous
>> to ARROW-15617 for C++.)
>>
>> On a side note, digging around to refresh my memory shows that gRPC Java
>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
>> sure it's quite relevant for us, since we still need to get the data into
>> an off-heap buffer in the end, but I need to take a closer look. (See
>> grpc/grpc-java#8102.)
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>
>> Thanks for the tip David.
>>
>> Do you know if zero copy can be used safely on the ServerStreamListener
>> when using the VectorUnloader/Loader pattern above?
>>
>> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>>
>>
>> Hey Alex,
>>
>> Basically, you should call start() exactly once, as you noticed, it sends
>> the initial schema message.
>>
>> If the VectorSchemaRoot is not stable, what you should do is create your
>> own root with the same schema, and use VectorUnloader/VectorLoader to
>> transfer data from the source root to the root used by Flight.
>>
>> Does that make sense? This would be good to add to the Arrow Java
>> cookbook (at least, the VectorLoader/Unloader part).
>>
>> -David
>>
>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>
>> Hi team,
>>
>> We are currently building a Flight service which proxies requests in
>> Java. We are currently getting getStream working on the FlightProducer.
>>
>> The code looks similar to this
>>
>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>     FlightStream stream = *this*.client.getStream(ticket);
>>     *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) { *break*; }
>>
>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>         listener.putNext();
>>     }
>>
>>     listener.completed();
>>
>> }
>>
>>
>> We are running into issues understanding if this is valid usage? I have
>> looked at the OutBoundStreamListenerImpl.java file and it looks like
>> calling start() on the listener causes it to resend some schema messages.
>> We are trying to understand how to handle the case where
>> flightStream.getRoot() returns a different VectorSchemaRoot than the
>> previous call.
>>
>> For more context we have also tried
>>
>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>     *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) { *break*; }
>>
>>         listener.putNext();
>>     }
>>     listener.completed();}
>>
>> But ran into issues with the connection not closing, we believe this to
>> be due to the VectorSchemaRoot changing on flightStream.next() calls. We
>> believe this is a issue because we are sharing the root with both the
>> FlightStream and ServerStreamListener.
>> https://github.com/dremio-hub/arrow-flight-client-examples is the client
>> we are using to test this end to end.
>>
>> Please let me know if you can provide any clarity, I would be happy to
>> update the documentation afterwards.
>>
>> Sincerely,
>> Alex McRae
>> alex.mcrae@dremio.com
>>
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>
>>
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by James Duong <ja...@bitquilltech.com>.
This was patching the application code (the FlightProducer implementation).

I wouldn't say this happened under heavy load, but would consistently
happen with queries that returned enough data in one chunk that a client
would need to report being busy on.

The behavior you're describing seems to indicate that the handlers should
run on any thread available to gRPC, but my observation was that the
handler would run on thread that getStream was called on. I haven't looked
through to see if that is the case though.

On Fri., Mar. 11, 2022, 17:47 David Li, <li...@apache.org> wrote:

> Hmm, you mean patching Flight itself? Or the application code? (Sounds
> like the latter?)
>
> Just curious - were you seeing this hanging only under load, and with a
> fixed thread pool configured on the gRPC server? There is a pitfall there
> due to how gRPC is implemented (certain internal callbacks, including the
> one that sets the isCancelled flag IIRC, are run on the same thread pool as
> RPC handlers, so if 1) your RPC handlers are synchronous, 2) your thread
> pool has fixed capacity, and 3) all threads are servicing a call, then they
> can get stuck forever because the gRPC internal callbacks can never run).
>
> On Fri, Mar 11, 2022, at 20:11, James Duong wrote:
>
> Following up on this, we were able to get by the hanging issue by changing
> the FlightProducer getStream() implementation to send its work (including
> the blocking on CallbackBackpressureStrategy) to a background thread
> instead of the gRPC thread running it.
>
> On Tue, Mar 8, 2022 at 4:26 PM James Duong <ja...@bitquilltech.com>
> wrote:
>
> We see backpressure related timeouts. I'm thinking there's an issue with
> CallbackBackpressureStrategy relying on ServerStreamListener#isReady().
> I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.
>
> We're going to try a fix for this locally then if it helps create a PR.
>
> On Mon, Mar 7, 2022 at 4:19 PM David Li <li...@apache.org> wrote:
>
>
> So you're finding that if you remove the backpressure handler, there are
> no problems?
>
> Is the timeout a gRPC timeout? Do you know if any messages are making it
> through, or is it timing out after a period of no activity at all?
>
> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
>
> Hi David,
>
> We believe we have a data race somewhere as debugging the code above
> causes no issues but running it without the debugger causes a timeout. We
> were trying to investigate if putNext kept around a reference to the data
> in the VectorSchemaRoot. Given that we are getting a timeout with the
> backpressure we think it is possible the code
> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
> may be the culprit.
>
> On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:
>
>
> It should be safe. Are you seeing any issues?
>
> Flight waits for an explicit next()/putNext() to actually touch anything.
> And once they return, Flight will not read or mutate your data. So for
> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
> which you can reuse the Arrow buffers. (This is *not* true if you have
> zero-copy writes enabled. In that case we poke a reference to the Arrow
> buffers into the gRPC data structure and so mutating your Arrow buffer will
> mysteriously change "previously written" data.)
>
> It's been years since I've touched this, though, so the details here are
> fuzzy to me...
>
> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>
> Hi all,
>
> Related to this issue and solution below we were wondering if it is safe
> to call VectorLoader.load() before checking if a client is ready when using
> back pressure strategy. The thinking is that the client may still be
> reading data from the root and calling load() may cause a data race.
>
> *public* void getStream(CallContext context, ServerStreamListener listener) {
>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>
>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>
>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>       listener.start(root, flightStream.getDictionaryProvider());
>
>       *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) {
>           *break*;
>         }
>
>         *if* (flightStream.getRoot() != clientRoot) {
>           clientRoot = flightStream.getRoot();
>           vectorUnloader = *new* VectorUnloader(clientRoot);
>         }
>
>         // is this safe to happen before the client is ready?
>         vectorLoader.load(vectorUnloader.getRecordBatch());
>
>         // this uses the build in CallBackpressureStrategy
>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>           listener.putNext();
>         }
>       }
>
>       listener.completed();}
>
> Let me know what you think.
>
> Sincerely,
>
> Alex McRae
>
> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com>
> wrote:
>
> Absolutely!
>
> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>
>
> Just to make sure this doesn't get forgotten: I filed
> https://github.com/apache/arrow-cookbook/issues/158 for providing an
> example of this.
>
> -David
>
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>
> It should be safe. The fast-read path has pretty much always been enabled
> and I'm not aware of issues it causes. (The fast-read path simply calls an
> internal gRPC method to avoid bouncing through byte[], but we're still
> copying the data into Arrow, now that I look at it.)
>
> The fast-write path is not relevant here, but that's the one that is
> trickier to use. We should make sure the optimization(s) are properly
> documented, since looking through it's not really explained what the
> consequences are (or at least the flag in ArrowMessage should reference
> setUseZeroCopy, and we should have a doc page for these env vars analogous
> to ARROW-15617 for C++.)
>
> On a side note, digging around to refresh my memory shows that gRPC Java
> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
> sure it's quite relevant for us, since we still need to get the data into
> an off-heap buffer in the end, but I need to take a closer look. (See
> grpc/grpc-java#8102.)
>
> -David
>
> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>
> Thanks for the tip David.
>
> Do you know if zero copy can be used safely on the ServerStreamListener
> when using the VectorUnloader/Loader pattern above?
>
> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>
>
> Hey Alex,
>
> Basically, you should call start() exactly once, as you noticed, it sends
> the initial schema message.
>
> If the VectorSchemaRoot is not stable, what you should do is create your
> own root with the same schema, and use VectorUnloader/VectorLoader to
> transfer data from the source root to the root used by Flight.
>
> Does that make sense? This would be good to add to the Arrow Java cookbook
> (at least, the VectorLoader/Unloader part).
>
> -David
>
> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>
> Hi team,
>
> We are currently building a Flight service which proxies requests in Java.
> We are currently getting getStream working on the FlightProducer.
>
> The code looks similar to this
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>
>     listener.completed();
>
> }
>
>
> We are running into issues understanding if this is valid usage? I have
> looked at the OutBoundStreamListenerImpl.java file and it looks like
> calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where
> flightStream.getRoot() returns a different VectorSchemaRoot than the
> previous call.
>
> For more context we have also tried
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.putNext();
>     }
>     listener.completed();}
>
> But ran into issues with the connection not closing, we believe this to be
> due to the VectorSchemaRoot changing on flightStream.next() calls. We
> believe this is a issue because we are sharing the root with both the
> FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client
> we are using to test this end to end.
>
> Please let me know if you can provide any clarity, I would be happy to
> update the documentation afterwards.
>
> Sincerely,
> Alex McRae
> alex.mcrae@dremio.com
>
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>
>
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by David Li <li...@apache.org>.
Hmm, you mean patching Flight itself? Or the application code? (Sounds like the latter?)

Just curious - were you seeing this hanging only under load, and with a fixed thread pool configured on the gRPC server? There is a pitfall there due to how gRPC is implemented (certain internal callbacks, including the one that sets the isCancelled flag IIRC, are run on the same thread pool as RPC handlers, so if 1) your RPC handlers are synchronous, 2) your thread pool has fixed capacity, and 3) all threads are servicing a call, then they can get stuck forever because the gRPC internal callbacks can never run).

On Fri, Mar 11, 2022, at 20:11, James Duong wrote:
> Following up on this, we were able to get by the hanging issue by changing the FlightProducer getStream() implementation to send its work (including the blocking on CallbackBackpressureStrategy) to a background thread instead of the gRPC thread running it.
> 
> On Tue, Mar 8, 2022 at 4:26 PM James Duong <ja...@bitquilltech.com> wrote:
>> We see backpressure related timeouts. I'm thinking there's an issue with CallbackBackpressureStrategy relying on ServerStreamListener#isReady().
>> I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.
>> 
>> We're going to try a fix for this locally then if it helps create a PR.
>> 
>> On Mon, Mar 7, 2022 at 4:19 PM David Li <li...@apache.org> wrote:
>>> __
>>> So you're finding that if you remove the backpressure handler, there are no problems?
>>> 
>>> Is the timeout a gRPC timeout? Do you know if any messages are making it through, or is it timing out after a period of no activity at all?
>>> 
>>> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
>>>> Hi David,
>>>> 
>>>> We believe we have a data race somewhere as debugging the code above causes no issues but running it without the debugger causes a timeout. We were trying to investigate if putNext kept around a reference to the data in the VectorSchemaRoot. Given that we are getting a timeout with the backpressure we think it is possible the code https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java may be the culprit.
>>>> 
>>>> On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:
>>>>> __
>>>>> It should be safe. Are you seeing any issues?
>>>>> 
>>>>> Flight waits for an explicit next()/putNext() to actually touch anything. And once they return, Flight will not read or mutate your data. So for instance, calling putNext() copies the Arrow data into a gRPC buffer, after which you can reuse the Arrow buffers. (This is *not* true if you have zero-copy writes enabled. In that case we poke a reference to the Arrow buffers into the gRPC data structure and so mutating your Arrow buffer will mysteriously change "previously written" data.)
>>>>> 
>>>>> It's been years since I've touched this, though, so the details here are fuzzy to me...
>>>>> 
>>>>> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>>>>>> Hi all,
>>>>>> 
>>>>>> Related to this issue and solution below we were wondering if it is safe to call VectorLoader.load() before checking if a client is ready when using back pressure strategy. The thinking is that the client may still be reading data from the root and calling load() may cause a data race.
>>>>>> 
>>>>>> *public* void getStream(CallContext context, ServerStreamListener listener) {
>>>>>>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>>>>>> 
>>>>>>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>>>>>>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>>>>>> 
>>>>>>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>>>>>>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>>>>>>       listener.start(root, flightStream.getDictionaryProvider());
>>>>>> 
>>>>>>       *while* (flightStream.next()) {
>>>>>>         *if* (!flightStream.hasRoot()) {
>>>>>>           *break*;
>>>>>>         }
>>>>>> 
>>>>>>         *if* (flightStream.getRoot() != clientRoot) {
>>>>>>           clientRoot = flightStream.getRoot();
>>>>>>           vectorUnloader = *new* VectorUnloader(clientRoot);
>>>>>>         }
>>>>>> 
>>>>>>         // is this safe to happen before the client is ready?
>>>>>>         vectorLoader.load(vectorUnloader.getRecordBatch());
>>>>>> 
>>>>>>         // this uses the build in CallBackpressureStrategy
>>>>>>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>>>>>>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>>>>>>           listener.putNext();
>>>>>>         }
>>>>>>       }
>>>>>> 
>>>>>>       listener.completed();
>>>>>> }
>>>>>> 
>>>>>> Let me know what you think.
>>>>>> 
>>>>>> Sincerely,
>>>>>> 
>>>>>> Alex McRae
>>>>>> 
>>>>>>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com> wrote:
>>>>>>> 
>>>>>>> Absolutely!
>>>>>>> 
>>>>>>> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>>>>>>>> __
>>>>>>>> Just to make sure this doesn't get forgotten: I filed https://github.com/apache/arrow-cookbook/issues/158 for providing an example of this.
>>>>>>>> 
>>>>>>>> -David
>>>>>>>> 
>>>>>>>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>>>>>>>> It should be safe. The fast-read path has pretty much always been enabled and I'm not aware of issues it causes. (The fast-read path simply calls an internal gRPC method to avoid bouncing through byte[], but we're still copying the data into Arrow, now that I look at it.)
>>>>>>>>> 
>>>>>>>>> The fast-write path is not relevant here, but that's the one that is trickier to use. We should make sure the optimization(s) are properly documented, since looking through it's not really explained what the consequences are (or at least the flag in ArrowMessage should reference setUseZeroCopy, and we should have a doc page for these env vars analogous to ARROW-15617 for C++.)
>>>>>>>>> 
>>>>>>>>> On a side note, digging around to refresh my memory shows that gRPC Java *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure it's quite relevant for us, since we still need to get the data into an off-heap buffer in the end, but I need to take a closer look. (See grpc/grpc-java#8102.)
>>>>>>>>> 
>>>>>>>>> -David
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>>>>>>>>> Thanks for the tip David.
>>>>>>>>>> 
>>>>>>>>>> Do you know if zero copy can be used safely on the ServerStreamListener when using the VectorUnloader/Loader pattern above?
>>>>>>>>>> 
>>>>>>>>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>>>>>>>>>>> __
>>>>>>>>>>> Hey Alex,
>>>>>>>>>>> 
>>>>>>>>>>> Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.
>>>>>>>>>>> 
>>>>>>>>>>> If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.
>>>>>>>>>>> 
>>>>>>>>>>> Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).
>>>>>>>>>>> 
>>>>>>>>>>> -David
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>>>>>>>>>> Hi team,
>>>>>>>>>>>> 
>>>>>>>>>>>> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
>>>>>>>>>>>> 
>>>>>>>>>>>> The code looks similar to this
>>>>>>>>>>>> 
>>>>>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>>>>>>>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>>>>>>>>>>     *while* (flightStream.next()) {
>>>>>>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>>>>>>         
>>>>>>>>>>>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>>>>>>>>>         listener.putNext();
>>>>>>>>>>>>     }
>>>>>>>>>>>>     listener.completed();
>>>>>>>>>>>> }
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
>>>>>>>>>>>> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
>>>>>>>>>>>> 
>>>>>>>>>>>> For more context we have also tried
>>>>>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>>>>>>>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>>>>>>>>>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>>>>>>>>>     *while* (flightStream.next()) {
>>>>>>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>>>>>>         
>>>>>>>>>>>>         listener.putNext();
>>>>>>>>>>>>     }
>>>>>>>>>>>>     listener.completed();
>>>>>>>>>>>> }
>>>>>>>>>>>> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
>>>>>>>>>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the client we are using to test this end to end.
>>>>>>>>>>>> 
>>>>>>>>>>>> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
>>>>>>>>>>>> 
>>>>>>>>>>>> Sincerely,
>>>>>>>>>>>> Alex McRae
>>>>>>>>>>>> alex.mcrae@dremio.com
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -- 
>>>>>>>>>> *James Duong*
>>>>>>>>>> Lead Software Developer
>>>>>>>>>> Bit Quill Technologies Inc.
>>>>>>>>>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>>>>>>>>>> https://www.bitquilltech.com
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.
>>>>>>>>> 
>>>>>>>> 
>>>>> 
>>> 
>> 
>> 
>> -- 
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>> 
>> 
>> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.
> 
> 
> -- 
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
> 
> 
> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by James Duong <ja...@bitquilltech.com>.
Following up on this, we were able to get by the hanging issue by changing
the FlightProducer getStream() implementation to send its work (including
the blocking on CallbackBackpressureStrategy) to a background thread
instead of the gRPC thread running it.

On Tue, Mar 8, 2022 at 4:26 PM James Duong <ja...@bitquilltech.com> wrote:

> We see backpressure related timeouts. I'm thinking there's an issue with
> CallbackBackpressureStrategy relying on ServerStreamListener#isReady().
> I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.
>
> We're going to try a fix for this locally then if it helps create a PR.
>
> On Mon, Mar 7, 2022 at 4:19 PM David Li <li...@apache.org> wrote:
>
>> So you're finding that if you remove the backpressure handler, there are
>> no problems?
>>
>> Is the timeout a gRPC timeout? Do you know if any messages are making it
>> through, or is it timing out after a period of no activity at all?
>>
>> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
>>
>> Hi David,
>>
>> We believe we have a data race somewhere as debugging the code above
>> causes no issues but running it without the debugger causes a timeout. We
>> were trying to investigate if putNext kept around a reference to the data
>> in the VectorSchemaRoot. Given that we are getting a timeout with the
>> backpressure we think it is possible the code
>> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
>> may be the culprit.
>>
>> On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:
>>
>>
>> It should be safe. Are you seeing any issues?
>>
>> Flight waits for an explicit next()/putNext() to actually touch anything.
>> And once they return, Flight will not read or mutate your data. So for
>> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
>> which you can reuse the Arrow buffers. (This is *not* true if you have
>> zero-copy writes enabled. In that case we poke a reference to the Arrow
>> buffers into the gRPC data structure and so mutating your Arrow buffer will
>> mysteriously change "previously written" data.)
>>
>> It's been years since I've touched this, though, so the details here are
>> fuzzy to me...
>>
>> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>>
>> Hi all,
>>
>> Related to this issue and solution below we were wondering if it is safe
>> to call VectorLoader.load() before checking if a client is ready when using
>> back pressure strategy. The thinking is that the client may still be
>> reading data from the root and calling load() may cause a data race.
>>
>> *public* void getStream(CallContext context, ServerStreamListener listener) {
>>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>>
>>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>>
>>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>>       listener.start(root, flightStream.getDictionaryProvider());
>>
>>       *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) {
>>           *break*;
>>         }
>>
>>         *if* (flightStream.getRoot() != clientRoot) {
>>           clientRoot = flightStream.getRoot();
>>           vectorUnloader = *new* VectorUnloader(clientRoot);
>>         }
>>
>>         // is this safe to happen before the client is ready?
>>         vectorLoader.load(vectorUnloader.getRecordBatch());
>>
>>         // this uses the build in CallBackpressureStrategy
>>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>>           listener.putNext();
>>         }
>>       }
>>
>>       listener.completed();}
>>
>> Let me know what you think.
>>
>> Sincerely,
>>
>> Alex McRae
>>
>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com>
>> wrote:
>>
>> Absolutely!
>>
>> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>>
>>
>> Just to make sure this doesn't get forgotten: I filed
>> https://github.com/apache/arrow-cookbook/issues/158 for providing an
>> example of this.
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>
>> It should be safe. The fast-read path has pretty much always been enabled
>> and I'm not aware of issues it causes. (The fast-read path simply calls an
>> internal gRPC method to avoid bouncing through byte[], but we're still
>> copying the data into Arrow, now that I look at it.)
>>
>> The fast-write path is not relevant here, but that's the one that is
>> trickier to use. We should make sure the optimization(s) are properly
>> documented, since looking through it's not really explained what the
>> consequences are (or at least the flag in ArrowMessage should reference
>> setUseZeroCopy, and we should have a doc page for these env vars analogous
>> to ARROW-15617 for C++.)
>>
>> On a side note, digging around to refresh my memory shows that gRPC Java
>> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
>> sure it's quite relevant for us, since we still need to get the data into
>> an off-heap buffer in the end, but I need to take a closer look. (See
>> grpc/grpc-java#8102.)
>>
>> -David
>>
>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>
>> Thanks for the tip David.
>>
>> Do you know if zero copy can be used safely on the ServerStreamListener
>> when using the VectorUnloader/Loader pattern above?
>>
>> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>>
>>
>> Hey Alex,
>>
>> Basically, you should call start() exactly once, as you noticed, it sends
>> the initial schema message.
>>
>> If the VectorSchemaRoot is not stable, what you should do is create your
>> own root with the same schema, and use VectorUnloader/VectorLoader to
>> transfer data from the source root to the root used by Flight.
>>
>> Does that make sense? This would be good to add to the Arrow Java
>> cookbook (at least, the VectorLoader/Unloader part).
>>
>> -David
>>
>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>
>> Hi team,
>>
>> We are currently building a Flight service which proxies requests in
>> Java. We are currently getting getStream working on the FlightProducer.
>>
>> The code looks similar to this
>>
>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>     FlightStream stream = *this*.client.getStream(ticket);
>>     *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) { *break*; }
>>
>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>         listener.putNext();
>>     }
>>
>>     listener.completed();
>>
>> }
>>
>>
>> We are running into issues understanding if this is valid usage? I have
>> looked at the OutBoundStreamListenerImpl.java file and it looks like
>> calling start() on the listener causes it to resend some schema messages.
>> We are trying to understand how to handle the case where
>> flightStream.getRoot() returns a different VectorSchemaRoot than the
>> previous call.
>>
>> For more context we have also tried
>>
>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>     *while* (flightStream.next()) {
>>         *if* (!flightStream.hasRoot()) { *break*; }
>>
>>         listener.putNext();
>>     }
>>     listener.completed();}
>>
>> But ran into issues with the connection not closing, we believe this to
>> be due to the VectorSchemaRoot changing on flightStream.next() calls. We
>> believe this is a issue because we are sharing the root with both the
>> FlightStream and ServerStreamListener.
>> https://github.com/dremio-hub/arrow-flight-client-examples is the client
>> we are using to test this end to end.
>>
>> Please let me know if you can provide any clarity, I would be happy to
>> update the documentation afterwards.
>>
>> Sincerely,
>> Alex McRae
>> alex.mcrae@dremio.com
>>
>>
>>
>>
>> --
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>>
>>
>> This email message is for the sole use of the intended recipient(s) and
>> may contain confidential and privileged information.  Any unauthorized
>> review, use, disclosure, or distribution is prohibited.  If you are not the
>> intended recipient, please contact the sender by reply email and destroy
>> all copies of the original message.  Thank you.
>>
>>
>>
>>
>>
>>
>
> --
>
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>


-- 

*James Duong*
Lead Software Developer
Bit Quill Technologies Inc.
Direct: +1.604.562.6082 | jamesd@bitquilltech.com
https://www.bitquilltech.com

This email message is for the sole use of the intended recipient(s) and may
contain confidential and privileged information.  Any unauthorized review,
use, disclosure, or distribution is prohibited.  If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.  Thank you.

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by James Duong <ja...@bitquilltech.com>.
We see backpressure related timeouts. I'm thinking there's an issue with
CallbackBackpressureStrategy relying on ServerStreamListener#isReady().
I've created https://issues.apache.org/jira/browse/ARROW-15876 for this.

We're going to try a fix for this locally then if it helps create a PR.

On Mon, Mar 7, 2022 at 4:19 PM David Li <li...@apache.org> wrote:

> So you're finding that if you remove the backpressure handler, there are
> no problems?
>
> Is the timeout a gRPC timeout? Do you know if any messages are making it
> through, or is it timing out after a period of no activity at all?
>
> On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
>
> Hi David,
>
> We believe we have a data race somewhere as debugging the code above
> causes no issues but running it without the debugger causes a timeout. We
> were trying to investigate if putNext kept around a reference to the data
> in the VectorSchemaRoot. Given that we are getting a timeout with the
> backpressure we think it is possible the code
> https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
> may be the culprit.
>
> On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:
>
>
> It should be safe. Are you seeing any issues?
>
> Flight waits for an explicit next()/putNext() to actually touch anything.
> And once they return, Flight will not read or mutate your data. So for
> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
> which you can reuse the Arrow buffers. (This is *not* true if you have
> zero-copy writes enabled. In that case we poke a reference to the Arrow
> buffers into the gRPC data structure and so mutating your Arrow buffer will
> mysteriously change "previously written" data.)
>
> It's been years since I've touched this, though, so the details here are
> fuzzy to me...
>
> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>
> Hi all,
>
> Related to this issue and solution below we were wondering if it is safe
> to call VectorLoader.load() before checking if a client is ready when using
> back pressure strategy. The thinking is that the client may still be
> reading data from the root and calling load() may cause a data race.
>
> *public* void getStream(CallContext context, ServerStreamListener listener) {
>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>
>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>
>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>       listener.start(root, flightStream.getDictionaryProvider());
>
>       *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) {
>           *break*;
>         }
>
>         *if* (flightStream.getRoot() != clientRoot) {
>           clientRoot = flightStream.getRoot();
>           vectorUnloader = *new* VectorUnloader(clientRoot);
>         }
>
>         // is this safe to happen before the client is ready?
>         vectorLoader.load(vectorUnloader.getRecordBatch());
>
>         // this uses the build in CallBackpressureStrategy
>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>           listener.putNext();
>         }
>       }
>
>       listener.completed();}
>
> Let me know what you think.
>
> Sincerely,
>
> Alex McRae
>
> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com>
> wrote:
>
> Absolutely!
>
> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>
>
> Just to make sure this doesn't get forgotten: I filed
> https://github.com/apache/arrow-cookbook/issues/158 for providing an
> example of this.
>
> -David
>
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>
> It should be safe. The fast-read path has pretty much always been enabled
> and I'm not aware of issues it causes. (The fast-read path simply calls an
> internal gRPC method to avoid bouncing through byte[], but we're still
> copying the data into Arrow, now that I look at it.)
>
> The fast-write path is not relevant here, but that's the one that is
> trickier to use. We should make sure the optimization(s) are properly
> documented, since looking through it's not really explained what the
> consequences are (or at least the flag in ArrowMessage should reference
> setUseZeroCopy, and we should have a doc page for these env vars analogous
> to ARROW-15617 for C++.)
>
> On a side note, digging around to refresh my memory shows that gRPC Java
> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
> sure it's quite relevant for us, since we still need to get the data into
> an off-heap buffer in the end, but I need to take a closer look. (See
> grpc/grpc-java#8102.)
>
> -David
>
> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>
> Thanks for the tip David.
>
> Do you know if zero copy can be used safely on the ServerStreamListener
> when using the VectorUnloader/Loader pattern above?
>
> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>
>
> Hey Alex,
>
> Basically, you should call start() exactly once, as you noticed, it sends
> the initial schema message.
>
> If the VectorSchemaRoot is not stable, what you should do is create your
> own root with the same schema, and use VectorUnloader/VectorLoader to
> transfer data from the source root to the root used by Flight.
>
> Does that make sense? This would be good to add to the Arrow Java cookbook
> (at least, the VectorLoader/Unloader part).
>
> -David
>
> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>
> Hi team,
>
> We are currently building a Flight service which proxies requests in Java.
> We are currently getting getStream working on the FlightProducer.
>
> The code looks similar to this
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>
>     listener.completed();
>
> }
>
>
> We are running into issues understanding if this is valid usage? I have
> looked at the OutBoundStreamListenerImpl.java file and it looks like
> calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where
> flightStream.getRoot() returns a different VectorSchemaRoot than the
> previous call.
>
> For more context we have also tried
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.putNext();
>     }
>     listener.completed();}
>
> But ran into issues with the connection not closing, we believe this to be
> due to the VectorSchemaRoot changing on flightStream.next() calls. We
> believe this is a issue because we are sharing the root with both the
> FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client
> we are using to test this end to end.
>
> Please let me know if you can provide any clarity, I would be happy to
> update the documentation afterwards.
>
> Sincerely,
> Alex McRae
> alex.mcrae@dremio.com
>
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>
>
>
>

-- 

*James Duong*
Lead Software Developer
Bit Quill Technologies Inc.
Direct: +1.604.562.6082 | jamesd@bitquilltech.com
https://www.bitquilltech.com

This email message is for the sole use of the intended recipient(s) and may
contain confidential and privileged information.  Any unauthorized review,
use, disclosure, or distribution is prohibited.  If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.  Thank you.

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by David Li <li...@apache.org>.
So you're finding that if you remove the backpressure handler, there are no problems?

Is the timeout a gRPC timeout? Do you know if any messages are making it through, or is it timing out after a period of no activity at all?

On Mon, Mar 7, 2022, at 19:12, Alex McRae (CW) wrote:
> Hi David,
> 
> We believe we have a data race somewhere as debugging the code above causes no issues but running it without the debugger causes a timeout. We were trying to investigate if putNext kept around a reference to the data in the VectorSchemaRoot. Given that we are getting a timeout with the backpressure we think it is possible the code https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java may be the culprit.
> 
> On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:
>> __
>> It should be safe. Are you seeing any issues?
>> 
>> Flight waits for an explicit next()/putNext() to actually touch anything. And once they return, Flight will not read or mutate your data. So for instance, calling putNext() copies the Arrow data into a gRPC buffer, after which you can reuse the Arrow buffers. (This is *not* true if you have zero-copy writes enabled. In that case we poke a reference to the Arrow buffers into the gRPC data structure and so mutating your Arrow buffer will mysteriously change "previously written" data.)
>> 
>> It's been years since I've touched this, though, so the details here are fuzzy to me...
>> 
>> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>>> Hi all,
>>> 
>>> Related to this issue and solution below we were wondering if it is safe to call VectorLoader.load() before checking if a client is ready when using back pressure strategy. The thinking is that the client may still be reading data from the root and calling load() may cause a data race.
>>> 
>>> *public* void getStream(CallContext context, ServerStreamListener listener) {
>>>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>>> 
>>>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>>>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>>> 
>>>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>>>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>>>       listener.start(root, flightStream.getDictionaryProvider());
>>> 
>>>       *while* (flightStream.next()) {
>>>         *if* (!flightStream.hasRoot()) {
>>>           *break*;
>>>         }
>>> 
>>>         *if* (flightStream.getRoot() != clientRoot) {
>>>           clientRoot = flightStream.getRoot();
>>>           vectorUnloader = *new* VectorUnloader(clientRoot);
>>>         }
>>> 
>>>         // is this safe to happen before the client is ready?
>>>         vectorLoader.load(vectorUnloader.getRecordBatch());
>>> 
>>>         // this uses the build in CallBackpressureStrategy
>>>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>>>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>>>           listener.putNext();
>>>         }
>>>       }
>>> 
>>>       listener.completed();
>>> }
>>> 
>>> Let me know what you think.
>>> 
>>> Sincerely,
>>> 
>>> Alex McRae
>>> 
>>>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com> wrote:
>>>> 
>>>> Absolutely!
>>>> 
>>>> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>>>>> __
>>>>> Just to make sure this doesn't get forgotten: I filed https://github.com/apache/arrow-cookbook/issues/158 for providing an example of this.
>>>>> 
>>>>> -David
>>>>> 
>>>>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>>>>> It should be safe. The fast-read path has pretty much always been enabled and I'm not aware of issues it causes. (The fast-read path simply calls an internal gRPC method to avoid bouncing through byte[], but we're still copying the data into Arrow, now that I look at it.)
>>>>>> 
>>>>>> The fast-write path is not relevant here, but that's the one that is trickier to use. We should make sure the optimization(s) are properly documented, since looking through it's not really explained what the consequences are (or at least the flag in ArrowMessage should reference setUseZeroCopy, and we should have a doc page for these env vars analogous to ARROW-15617 for C++.)
>>>>>> 
>>>>>> On a side note, digging around to refresh my memory shows that gRPC Java *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure it's quite relevant for us, since we still need to get the data into an off-heap buffer in the end, but I need to take a closer look. (See grpc/grpc-java#8102.)
>>>>>> 
>>>>>> -David
>>>>>> 
>>>>>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>>>>>> Thanks for the tip David.
>>>>>>> 
>>>>>>> Do you know if zero copy can be used safely on the ServerStreamListener when using the VectorUnloader/Loader pattern above?
>>>>>>> 
>>>>>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>>>>>>>> __
>>>>>>>> Hey Alex,
>>>>>>>> 
>>>>>>>> Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.
>>>>>>>> 
>>>>>>>> If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.
>>>>>>>> 
>>>>>>>> Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).
>>>>>>>> 
>>>>>>>> -David
>>>>>>>> 
>>>>>>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>>>>>>> Hi team,
>>>>>>>>> 
>>>>>>>>> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
>>>>>>>>> 
>>>>>>>>> The code looks similar to this
>>>>>>>>> 
>>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>>>>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>>>>>>>     *while* (flightStream.next()) {
>>>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>>>         
>>>>>>>>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>>>>>>         listener.putNext();
>>>>>>>>>     }
>>>>>>>>>     listener.completed();
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
>>>>>>>>> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
>>>>>>>>> 
>>>>>>>>> For more context we have also tried
>>>>>>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>>>>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>>>>>>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>>>>>>     *while* (flightStream.next()) {
>>>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>>>         
>>>>>>>>>         listener.putNext();
>>>>>>>>>     }
>>>>>>>>>     listener.completed();
>>>>>>>>> }
>>>>>>>>> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
>>>>>>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the client we are using to test this end to end.
>>>>>>>>> 
>>>>>>>>> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
>>>>>>>>> 
>>>>>>>>> Sincerely,
>>>>>>>>> Alex McRae
>>>>>>>>> alex.mcrae@dremio.com
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -- 
>>>>>>> *James Duong*
>>>>>>> Lead Software Developer
>>>>>>> Bit Quill Technologies Inc.
>>>>>>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>>>>>>> https://www.bitquilltech.com
>>>>>>> 
>>>>>>> 
>>>>>>> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.
>>>>>> 
>>>>> 
>> 

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by "Alex McRae (CW)" <al...@dremio.com>.
Hi David,

We believe we have a data race somewhere as debugging the code above causes
no issues but running it without the debugger causes a timeout. We were
trying to investigate if putNext kept around a reference to the data in the
VectorSchemaRoot. Given that we are getting a timeout with the backpressure
we think it is possible the code
https://github.com/dremio-hub/arrow-flight-client-examples/tree/main/java
may be the culprit.

On Fri, Mar 4, 2022 at 2:35 PM David Li <li...@apache.org> wrote:

> It should be safe. Are you seeing any issues?
>
> Flight waits for an explicit next()/putNext() to actually touch anything.
> And once they return, Flight will not read or mutate your data. So for
> instance, calling putNext() copies the Arrow data into a gRPC buffer, after
> which you can reuse the Arrow buffers. (This is *not* true if you have
> zero-copy writes enabled. In that case we poke a reference to the Arrow
> buffers into the gRPC data structure and so mutating your Arrow buffer will
> mysteriously change "previously written" data.)
>
> It's been years since I've touched this, though, so the details here are
> fuzzy to me...
>
> On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
>
> Hi all,
>
> Related to this issue and solution below we were wondering if it is safe
> to call VectorLoader.load() before checking if a client is ready when using
> back pressure strategy. The thinking is that the client may still be
> reading data from the root and calling load() may cause a data race.
>
> *public* void getStream(CallContext context, ServerStreamListener listener) {
>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
>
>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
>
>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>       listener.start(root, flightStream.getDictionaryProvider());
>
>       *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) {
>           *break*;
>         }
>
>         *if* (flightStream.getRoot() != clientRoot) {
>           clientRoot = flightStream.getRoot();
>           vectorUnloader = *new* VectorUnloader(clientRoot);
>         }
>
>         // is this safe to happen before the client is ready?
>         vectorLoader.load(vectorUnloader.getRecordBatch());
>
>         // this uses the build in CallBackpressureStrategy
>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>           listener.putNext();
>         }
>       }
>
>       listener.completed();}
>
> Let me know what you think.
>
> Sincerely,
>
> Alex McRae
>
> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com>
> wrote:
>
> Absolutely!
>
> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>
>
> Just to make sure this doesn't get forgotten: I filed
> https://github.com/apache/arrow-cookbook/issues/158 for providing an
> example of this.
>
> -David
>
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>
> It should be safe. The fast-read path has pretty much always been enabled
> and I'm not aware of issues it causes. (The fast-read path simply calls an
> internal gRPC method to avoid bouncing through byte[], but we're still
> copying the data into Arrow, now that I look at it.)
>
> The fast-write path is not relevant here, but that's the one that is
> trickier to use. We should make sure the optimization(s) are properly
> documented, since looking through it's not really explained what the
> consequences are (or at least the flag in ArrowMessage should reference
> setUseZeroCopy, and we should have a doc page for these env vars analogous
> to ARROW-15617 for C++.)
>
> On a side note, digging around to refresh my memory shows that gRPC Java
> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
> sure it's quite relevant for us, since we still need to get the data into
> an off-heap buffer in the end, but I need to take a closer look. (See
> grpc/grpc-java#8102.)
>
> -David
>
> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>
> Thanks for the tip David.
>
> Do you know if zero copy can be used safely on the ServerStreamListener
> when using the VectorUnloader/Loader pattern above?
>
> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>
>
> Hey Alex,
>
> Basically, you should call start() exactly once, as you noticed, it sends
> the initial schema message.
>
> If the VectorSchemaRoot is not stable, what you should do is create your
> own root with the same schema, and use VectorUnloader/VectorLoader to
> transfer data from the source root to the root used by Flight.
>
> Does that make sense? This would be good to add to the Arrow Java cookbook
> (at least, the VectorLoader/Unloader part).
>
> -David
>
> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>
> Hi team,
>
> We are currently building a Flight service which proxies requests in Java.
> We are currently getting getStream working on the FlightProducer.
>
> The code looks similar to this
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>
>     listener.completed();
>
> }
>
>
> We are running into issues understanding if this is valid usage? I have
> looked at the OutBoundStreamListenerImpl.java file and it looks like
> calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where
> flightStream.getRoot() returns a different VectorSchemaRoot than the
> previous call.
>
> For more context we have also tried
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.putNext();
>     }
>     listener.completed();}
>
> But ran into issues with the connection not closing, we believe this to be
> due to the VectorSchemaRoot changing on flightStream.next() calls. We
> believe this is a issue because we are sharing the root with both the
> FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client
> we are using to test this end to end.
>
> Please let me know if you can provide any clarity, I would be happy to
> update the documentation afterwards.
>
> Sincerely,
> Alex McRae
> alex.mcrae@dremio.com
>
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>
>
>

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by David Li <li...@apache.org>.
It should be safe. Are you seeing any issues?

Flight waits for an explicit next()/putNext() to actually touch anything. And once they return, Flight will not read or mutate your data. So for instance, calling putNext() copies the Arrow data into a gRPC buffer, after which you can reuse the Arrow buffers. (This is *not* true if you have zero-copy writes enabled. In that case we poke a reference to the Arrow buffers into the gRPC data structure and so mutating your Arrow buffer will mysteriously change "previously written" data.)

It's been years since I've touched this, though, so the details here are fuzzy to me...

On Fri, Mar 4, 2022, at 16:38, Alex McRae (CW) wrote:
> Hi all,
> 
> Related to this issue and solution below we were wondering if it is safe to call VectorLoader.load() before checking if a client is ready when using back pressure strategy. The thinking is that the client may still be reading data from the root and calling load() may cause a data race.
> 
> *public* void getStream(CallContext context, ServerStreamListener listener) {
>       *final* FlightStream flightStream = *this*.client.getStream(ticket);
> 
>       VectorSchemaRoot clientRoot = flightStream.getRoot();
>       VectorUnloader vectorUnloader = *new* VectorUnloader(clientRoot);
> 
>       *final* VectorSchemaRoot root = *new* VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
>       *final* VectorLoader vectorLoader = *new* VectorLoader(root);
>       listener.start(root, flightStream.getDictionaryProvider());
> 
>       *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) {
>           *break*;
>         }
> 
>         *if* (flightStream.getRoot() != clientRoot) {
>           clientRoot = flightStream.getRoot();
>           vectorUnloader = *new* VectorUnloader(clientRoot);
>         }
> 
>         // is this safe to happen before the client is ready?
>         vectorLoader.load(vectorUnloader.getRecordBatch());
> 
>         // this uses the build in CallBackpressureStrategy
>         *final* BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
>         *if* (waitResult == BackpressureStrategy.WaitResult.READY) {
>           listener.putNext();
>         }
>       }
> 
>       listener.completed();
> }
> 
> Let me know what you think.
> 
> Sincerely,
> 
> Alex McRae
> 
>> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com> wrote:
>> 
>> Absolutely!
>> 
>> On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:
>>> __
>>> Just to make sure this doesn't get forgotten: I filed https://github.com/apache/arrow-cookbook/issues/158 for providing an example of this.
>>> 
>>> -David
>>> 
>>> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>>>> It should be safe. The fast-read path has pretty much always been enabled and I'm not aware of issues it causes. (The fast-read path simply calls an internal gRPC method to avoid bouncing through byte[], but we're still copying the data into Arrow, now that I look at it.)
>>>> 
>>>> The fast-write path is not relevant here, but that's the one that is trickier to use. We should make sure the optimization(s) are properly documented, since looking through it's not really explained what the consequences are (or at least the flag in ArrowMessage should reference setUseZeroCopy, and we should have a doc page for these env vars analogous to ARROW-15617 for C++.)
>>>> 
>>>> On a side note, digging around to refresh my memory shows that gRPC Java *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure it's quite relevant for us, since we still need to get the data into an off-heap buffer in the end, but I need to take a closer look. (See grpc/grpc-java#8102.)
>>>> 
>>>> -David
>>>> 
>>>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>>>> Thanks for the tip David.
>>>>> 
>>>>> Do you know if zero copy can be used safely on the ServerStreamListener when using the VectorUnloader/Loader pattern above?
>>>>> 
>>>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>>>>>> __
>>>>>> Hey Alex,
>>>>>> 
>>>>>> Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.
>>>>>> 
>>>>>> If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.
>>>>>> 
>>>>>> Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).
>>>>>> 
>>>>>> -David
>>>>>> 
>>>>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>>>>> Hi team,
>>>>>>> 
>>>>>>> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
>>>>>>> 
>>>>>>> The code looks similar to this
>>>>>>> 
>>>>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>>>>>     *while* (flightStream.next()) {
>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>         
>>>>>>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>>>>         listener.putNext();
>>>>>>>     }
>>>>>>>     listener.completed();
>>>>>>> }
>>>>>>> 
>>>>>>> 
>>>>>>> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
>>>>>>> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
>>>>>>> 
>>>>>>> For more context we have also tried
>>>>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>>>>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>>>>     *while* (flightStream.next()) {
>>>>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>>>>         
>>>>>>>         listener.putNext();
>>>>>>>     }
>>>>>>>     listener.completed();
>>>>>>> }
>>>>>>> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
>>>>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the client we are using to test this end to end.
>>>>>>> 
>>>>>>> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
>>>>>>> 
>>>>>>> Sincerely,
>>>>>>> Alex McRae
>>>>>>> alex.mcrae@dremio.com
>>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> *James Duong*
>>>>> Lead Software Developer
>>>>> Bit Quill Technologies Inc.
>>>>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>>>>> https://www.bitquilltech.com
>>>>> 
>>>>> 
>>>>> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.
>>>> 
>>> 

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by "Alex McRae (CW)" <al...@dremio.com>.
Hi all,

Related to this issue and solution below we were wondering if it is safe to call VectorLoader.load() before checking if a client is ready when using back pressure strategy. The thinking is that the client may still be reading data from the root and calling load() may cause a data race.

public void getStream(CallContext context, ServerStreamListener listener) {
      final FlightStream flightStream = this.client.getStream(ticket);

      VectorSchemaRoot clientRoot = flightStream.getRoot();
      VectorUnloader vectorUnloader = new VectorUnloader(clientRoot);

      final VectorSchemaRoot root = new VectorSchemaRoot(clientRoot.getSchema(), clientRoot.getFieldVectors(), clientRoot.getRowCount());
      final VectorLoader vectorLoader = new VectorLoader(root);
      listener.start(root, flightStream.getDictionaryProvider());

      while (flightStream.next()) {
        if (!flightStream.hasRoot()) {
          break;
        }

        if (flightStream.getRoot() != clientRoot) {
          clientRoot = flightStream.getRoot();
          vectorUnloader = new VectorUnloader(clientRoot);
        }

        // is this safe to happen before the client is ready?
        vectorLoader.load(vectorUnloader.getRecordBatch());

        // this uses the build in CallBackpressureStrategy
        final BackpressureStrategy.WaitResult waitResult = backpressureStrategy.waitForListener(timeout);
        if (waitResult == BackpressureStrategy.WaitResult.READY) {
          listener.putNext();
        }
      }

      listener.completed();
}
Let me know what you think.

Sincerely,

Alex McRae

> On Feb 28, 2022, at 10:39 AM, Alex McRae (CW) <al...@dremio.com> wrote:
> 
> Absolutely!
> 
> On Mon, Feb 28, 2022 at 10:06 AM David Li <lidavidm@apache.org <ma...@apache.org>> wrote:
> Just to make sure this doesn't get forgotten: I filed https://github.com/apache/arrow-cookbook/issues/158 <https://github.com/apache/arrow-cookbook/issues/158> for providing an example of this.
> 
> -David
> 
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>> It should be safe. The fast-read path has pretty much always been enabled and I'm not aware of issues it causes. (The fast-read path simply calls an internal gRPC method to avoid bouncing through byte[], but we're still copying the data into Arrow, now that I look at it.)
>> 
>> The fast-write path is not relevant here, but that's the one that is trickier to use. We should make sure the optimization(s) are properly documented, since looking through it's not really explained what the consequences are (or at least the flag in ArrowMessage should reference setUseZeroCopy, and we should have a doc page for these env vars analogous to ARROW-15617 for C++.)
>> 
>> On a side note, digging around to refresh my memory shows that gRPC Java *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure it's quite relevant for us, since we still need to get the data into an off-heap buffer in the end, but I need to take a closer look. (See grpc/grpc-java#8102.)
>> 
>> -David
>> 
>> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>>> Thanks for the tip David.
>>> 
>>> Do you know if zero copy can be used safely on the ServerStreamListener when using the VectorUnloader/Loader pattern above?
>>> 
>>> On Mon, Feb 14, 2022 at 9:38 AM David Li <lidavidm@apache.org <ma...@apache.org>> wrote:
>>> 
>>> Hey Alex,
>>> 
>>> Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.
>>> 
>>> If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.
>>> 
>>> Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).
>>> 
>>> -David
>>> 
>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>> Hi team,
>>>> 
>>>> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
>>>> 
>>>> The code looks similar to this
>>>> 
>>>> public void getStream(CallContext context, ServerStreamListener listener) {
>>>>     FlightStream stream = this.client.getStream(ticket);
>>>>     while (flightStream.next()) {
>>>>         if (!flightStream.hasRoot()) { break; }
>>>>         
>>>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>         listener.putNext();
>>>>     }
>>>>     listener.completed();
>>>> }
>>>> 
>>>> 
>>>> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
>>>> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
>>>> 
>>>> For more context we have also tried
>>>> public void getStream(CallContext context, ServerStreamListener listener) {
>>>>     FlightStream flightStream = this.client.getStream(ticket);
>>>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>     while (flightStream.next()) {
>>>>         if (!flightStream.hasRoot()) { break; }
>>>>         
>>>>         listener.putNext();
>>>>     }
>>>>     listener.completed();
>>>> }
>>>> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
>>>> https://github.com/dremio-hub/arrow-flight-client-examples <https://github.com/dremio-hub/arrow-flight-client-examples> is the client we are using to test this end to end.
>>>> 
>>>> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
>>>> 
>>>> Sincerely,
>>>> Alex McRae
>>>> alex.mcrae@dremio.com <ma...@dremio.com>
>>> 
>>> 
>>> 
>>> -- 
>>> James Duong
>>> Lead Software Developer
>>> Bit Quill Technologies Inc.
>>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com <ma...@bitquilltech.com>
>>> https://www.bitquilltech.com <https://www.bitquilltech.com/>
>>> 
>>> 
>>> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.
>> 
> 


Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by "Alex McRae (CW)" <al...@dremio.com>.
Absolutely!

On Mon, Feb 28, 2022 at 10:06 AM David Li <li...@apache.org> wrote:

> Just to make sure this doesn't get forgotten: I filed
> https://github.com/apache/arrow-cookbook/issues/158 for providing an
> example of this.
>
> -David
>
> On Tue, Feb 15, 2022, at 13:54, David Li wrote:
>
> It should be safe. The fast-read path has pretty much always been enabled
> and I'm not aware of issues it causes. (The fast-read path simply calls an
> internal gRPC method to avoid bouncing through byte[], but we're still
> copying the data into Arrow, now that I look at it.)
>
> The fast-write path is not relevant here, but that's the one that is
> trickier to use. We should make sure the optimization(s) are properly
> documented, since looking through it's not really explained what the
> consequences are (or at least the flag in ArrowMessage should reference
> setUseZeroCopy, and we should have a doc page for these env vars analogous
> to ARROW-15617 for C++.)
>
> On a side note, digging around to refresh my memory shows that gRPC Java
> *finally* introduced a zero-copy Protobuf deserialization path. I'm not
> sure it's quite relevant for us, since we still need to get the data into
> an off-heap buffer in the end, but I need to take a closer look. (See
> grpc/grpc-java#8102.)
>
> -David
>
> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>
> Thanks for the tip David.
>
> Do you know if zero copy can be used safely on the ServerStreamListener
> when using the VectorUnloader/Loader pattern above?
>
> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>
>
> Hey Alex,
>
> Basically, you should call start() exactly once, as you noticed, it sends
> the initial schema message.
>
> If the VectorSchemaRoot is not stable, what you should do is create your
> own root with the same schema, and use VectorUnloader/VectorLoader to
> transfer data from the source root to the root used by Flight.
>
> Does that make sense? This would be good to add to the Arrow Java cookbook
> (at least, the VectorLoader/Unloader part).
>
> -David
>
> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>
> Hi team,
>
> We are currently building a Flight service which proxies requests in Java.
> We are currently getting getStream working on the FlightProducer.
>
> The code looks similar to this
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>
>     listener.completed();
>
> }
>
>
> We are running into issues understanding if this is valid usage? I have
> looked at the OutBoundStreamListenerImpl.java file and it looks like
> calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where
> flightStream.getRoot() returns a different VectorSchemaRoot than the
> previous call.
>
> For more context we have also tried
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.putNext();
>     }
>     listener.completed();}
>
> But ran into issues with the connection not closing, we believe this to be
> due to the VectorSchemaRoot changing on flightStream.next() calls. We
> believe this is a issue because we are sharing the root with both the
> FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client
> we are using to test this end to end.
>
> Please let me know if you can provide any clarity, I would be happy to
> update the documentation afterwards.
>
> Sincerely,
> Alex McRae
> alex.mcrae@dremio.com
>
>
>
>
> --
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
>
>
> This email message is for the sole use of the intended recipient(s) and
> may contain confidential and privileged information.  Any unauthorized
> review, use, disclosure, or distribution is prohibited.  If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.  Thank you.
>
>
>
>

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by David Li <li...@apache.org>.
Just to make sure this doesn't get forgotten: I filed https://github.com/apache/arrow-cookbook/issues/158 for providing an example of this.

-David

On Tue, Feb 15, 2022, at 13:54, David Li wrote:
> It should be safe. The fast-read path has pretty much always been enabled and I'm not aware of issues it causes. (The fast-read path simply calls an internal gRPC method to avoid bouncing through byte[], but we're still copying the data into Arrow, now that I look at it.)
> 
> The fast-write path is not relevant here, but that's the one that is trickier to use. We should make sure the optimization(s) are properly documented, since looking through it's not really explained what the consequences are (or at least the flag in ArrowMessage should reference setUseZeroCopy, and we should have a doc page for these env vars analogous to ARROW-15617 for C++.)
> 
> On a side note, digging around to refresh my memory shows that gRPC Java *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure it's quite relevant for us, since we still need to get the data into an off-heap buffer in the end, but I need to take a closer look. (See grpc/grpc-java#8102.)
> 
> -David
> 
> On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
>> Thanks for the tip David.
>> 
>> Do you know if zero copy can be used safely on the ServerStreamListener when using the VectorUnloader/Loader pattern above?
>> 
>> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>>> __
>>> Hey Alex,
>>> 
>>> Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.
>>> 
>>> If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.
>>> 
>>> Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).
>>> 
>>> -David
>>> 
>>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>>> Hi team,
>>>> 
>>>> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
>>>> 
>>>> The code looks similar to this
>>>> 
>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>>     *while* (flightStream.next()) {
>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>         
>>>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>         listener.putNext();
>>>>     }
>>>>     listener.completed();
>>>> }
>>>> 
>>>> 
>>>> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
>>>> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
>>>> 
>>>> For more context we have also tried
>>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>>     *while* (flightStream.next()) {
>>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>>         
>>>>         listener.putNext();
>>>>     }
>>>>     listener.completed();
>>>> }
>>>> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
>>>> https://github.com/dremio-hub/arrow-flight-client-examples is the client we are using to test this end to end.
>>>> 
>>>> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
>>>> 
>>>> Sincerely,
>>>> Alex McRae
>>>> alex.mcrae@dremio.com
>>> 
>> 
>> 
>> -- 
>> *James Duong*
>> Lead Software Developer
>> Bit Quill Technologies Inc.
>> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
>> https://www.bitquilltech.com
>> 
>> 
>> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.
> 

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by David Li <li...@apache.org>.
It should be safe. The fast-read path has pretty much always been enabled and I'm not aware of issues it causes. (The fast-read path simply calls an internal gRPC method to avoid bouncing through byte[], but we're still copying the data into Arrow, now that I look at it.)

The fast-write path is not relevant here, but that's the one that is trickier to use. We should make sure the optimization(s) are properly documented, since looking through it's not really explained what the consequences are (or at least the flag in ArrowMessage should reference setUseZeroCopy, and we should have a doc page for these env vars analogous to ARROW-15617 for C++.)

On a side note, digging around to refresh my memory shows that gRPC Java *finally* introduced a zero-copy Protobuf deserialization path. I'm not sure it's quite relevant for us, since we still need to get the data into an off-heap buffer in the end, but I need to take a closer look. (See grpc/grpc-java#8102.)

-David

On Tue, Feb 15, 2022, at 13:12, James Duong wrote:
> Thanks for the tip David.
> 
> Do you know if zero copy can be used safely on the ServerStreamListener when using the VectorUnloader/Loader pattern above?
> 
> On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:
>> __
>> Hey Alex,
>> 
>> Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.
>> 
>> If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.
>> 
>> Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).
>> 
>> -David
>> 
>> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>>> Hi team,
>>> 
>>> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
>>> 
>>> The code looks similar to this
>>> 
>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>     FlightStream stream = *this*.client.getStream(ticket);
>>>     *while* (flightStream.next()) {
>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>         
>>>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>         listener.putNext();
>>>     }
>>>     listener.completed();
>>> }
>>> 
>>> 
>>> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
>>> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
>>> 
>>> For more context we have also tried
>>> public *void* getStream(CallContext context, ServerStreamListener listener) {
>>>     FlightStream flightStream = *this*.client.getStream(ticket);
>>>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>>>     *while* (flightStream.next()) {
>>>         *if* (!flightStream.hasRoot()) { *break*; }
>>>         
>>>         listener.putNext();
>>>     }
>>>     listener.completed();
>>> }
>>> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
>>> https://github.com/dremio-hub/arrow-flight-client-examples is the client we are using to test this end to end.
>>> 
>>> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
>>> 
>>> Sincerely,
>>> Alex McRae
>>> alex.mcrae@dremio.com
>> 
> 
> 
> -- 
> *James Duong*
> Lead Software Developer
> Bit Quill Technologies Inc.
> Direct: +1.604.562.6082 | jamesd@bitquilltech.com
> https://www.bitquilltech.com
> 
> 
> This email message is for the sole use of the intended recipient(s) and may contain confidential and privileged information.  Any unauthorized review, use, disclosure, or distribution is prohibited.  If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message.  Thank you.

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by James Duong <ja...@bitquilltech.com>.
Thanks for the tip David.

Do you know if zero copy can be used safely on the ServerStreamListener
when using the VectorUnloader/Loader pattern above?

On Mon, Feb 14, 2022 at 9:38 AM David Li <li...@apache.org> wrote:

> Hey Alex,
>
> Basically, you should call start() exactly once, as you noticed, it sends
> the initial schema message.
>
> If the VectorSchemaRoot is not stable, what you should do is create your
> own root with the same schema, and use VectorUnloader/VectorLoader to
> transfer data from the source root to the root used by Flight.
>
> Does that make sense? This would be good to add to the Arrow Java cookbook
> (at least, the VectorLoader/Unloader part).
>
> -David
>
> On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
>
> Hi team,
>
> We are currently building a Flight service which proxies requests in Java.
> We are currently getting getStream working on the FlightProducer.
>
> The code looks similar to this
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>
>     listener.completed();
>
> }
>
>
> We are running into issues understanding if this is valid usage? I have
> looked at the OutBoundStreamListenerImpl.java file and it looks like
> calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where
> flightStream.getRoot() returns a different VectorSchemaRoot than the
> previous call.
>
> For more context we have also tried
>
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>
>         listener.putNext();
>     }
>     listener.completed();}
>
> But ran into issues with the connection not closing, we believe this to be
> due to the VectorSchemaRoot changing on flightStream.next() calls. We
> believe this is a issue because we are sharing the root with both the
> FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client
> we are using to test this end to end.
>
> Please let me know if you can provide any clarity, I would be happy to
> update the documentation afterwards.
>
> Sincerely,
> Alex McRae
> alex.mcrae@dremio.com
>
>
>

-- 

*James Duong*
Lead Software Developer
Bit Quill Technologies Inc.
Direct: +1.604.562.6082 | jamesd@bitquilltech.com
https://www.bitquilltech.com

This email message is for the sole use of the intended recipient(s) and may
contain confidential and privileged information.  Any unauthorized review,
use, disclosure, or distribution is prohibited.  If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.  Thank you.

Re: Correct usage of listener.start() when root is shared with FlightStream

Posted by David Li <li...@apache.org>.
Hey Alex,

Basically, you should call start() exactly once, as you noticed, it sends the initial schema message.

If the VectorSchemaRoot is not stable, what you should do is create your own root with the same schema, and use VectorUnloader/VectorLoader to transfer data from the source root to the root used by Flight.

Does that make sense? This would be good to add to the Arrow Java cookbook (at least, the VectorLoader/Unloader part).

-David

On Mon, Feb 14, 2022, at 12:27, Alex McRae (CW) wrote:
> Hi team,
> 
> We are currently building a Flight service which proxies requests in Java. We are currently getting getStream working on the FlightProducer.
> 
> The code looks similar to this
> 
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream stream = *this*.client.getStream(ticket);
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>         
>         listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>         listener.putNext();
>     }
>     listener.completed();
> }
> 
> 
> We are running into issues understanding if this is valid usage? I have looked at the OutBoundStreamListenerImpl.java file and it looks like calling start() on the listener causes it to resend some schema messages.
> We are trying to understand how to handle the case where flightStream.getRoot() returns a different VectorSchemaRoot than the previous call.
> 
> For more context we have also tried
> public *void* getStream(CallContext context, ServerStreamListener listener) {
>     FlightStream flightStream = *this*.client.getStream(ticket);
>     listener.start(flightStream.getRoot(), flightStream.getDictionaryProvider());
>     *while* (flightStream.next()) {
>         *if* (!flightStream.hasRoot()) { *break*; }
>         
>         listener.putNext();
>     }
>     listener.completed();
> }
> But ran into issues with the connection not closing, we believe this to be due to the VectorSchemaRoot changing on flightStream.next() calls. We believe this is a issue because we are sharing the root with both the FlightStream and ServerStreamListener.
> https://github.com/dremio-hub/arrow-flight-client-examples is the client we are using to test this end to end.
> 
> Please let me know if you can provide any clarity, I would be happy to update the documentation afterwards.
> 
> Sincerely,
> Alex McRae
> alex.mcrae@dremio.com