You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Haocheng Liu <lb...@gmail.com> on 2023/05/18 22:25:44 UTC

[C++][Parquet] Extend RcordBatchReader to write parquet from streaming data

Hi,

I have a question on how to use the Acero push model to write streaming
data as hive partitioning Parquet in a single thread program. Can anyone
guide what's the best practice here and if my below understandings are
correct:

   - I receive streaming data via a callback function which gives me data
   row by row. To my best knowledge, Subclassing RecordBatchReader is
   preferred?
   - Should I batch a fixed number rows in some in memory data structure
   first, then flush them to acero? Then how could acero know it's time to
   push data in ReadNext
   <https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
    function?

I'm not clear on how to connect a call back function from streaming data
with Aecro push model. Any suggestions will be appreciated.


Thanks.

Best,
Haocheng

Re: [C++][Parquet] Extend RcordBatchReader to write parquet from streaming data

Posted by Haocheng Liu <lb...@gmail.com>.
update:
I believe my previous questions are all answered in the example
<https://gist.github.com/westonpace/6f7fdbdc0399501418101851d75091c4> you've
provided with the conditional_variable.

Thanks for your help Weston 👍







On Fri, May 19, 2023 at 9:48 AM Haocheng Liu <lb...@gmail.com> wrote:

> Hi Weston,
>
> Thanks for the detailed reply! I will  explore the example you provided.
>
> > -   You should block until sufficient data is available.
> Can you plz elaborate on how to block in RecordBatchReader subclass?
>
> Downstream nodes will only pull when Flush(...) is called?
> ```
>
>  std::shared_ptr<arrow::RecordBatch> batch;516  ARROW_ASSIGN_OR_RAISE(batch, batch_builder->Flush());
>
> ```
>
> Thanks in advance.
>
> Regards,
> Haocheng
>
>
>
>
>
> On Fri, May 19, 2023 at 9:03 AM Weston Pace <we...@gmail.com> wrote:
>
>> > Can anyone guide what's the best practice here and if my
>> below understandings are correct:
>>
>> Here's an example:
>> https://gist.github.com/westonpace/6f7fdbdc0399501418101851d75091c4
>>
>> > I receive streaming data via a callback function which gives me data
>> row by row. To my best knowledge, Subclassing RecordBatchReader is
>> preferred?
>>
>> RecordBatchReader is pull-based.  A friendly push-based source node for
>> Acero would be a good idea, but doesn't exist.  You can make one using a
>> PushGenerator but that might be complicated.  In the meantime, subclassing
>> RecordBatchReader is probably simplest.
>>
>> > Should I batch a fixed number rows in some in memory data structure
>> first, then flush them to acero?
>>
>> Yes, for performance.
>>
>> > Then how could acero know it's time to push data in ReadNext
>> <https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
>>  function?
>>
>> ReadNext is pull based.  Acero will continually call ReadNext.  You
>> should block until sufficient data is available.
>>
>> > I have a question on how to use the Acero push model to write streaming
>> data as hive partitioning Parquet in a single thread program
>>
>> The example I gave will use one thread in addition to the main thread.
>> Doing something without any additional threads at all would be possible,
>> but would probably require knowing more about the structure of your program.
>>
>> On Thu, May 18, 2023 at 3:27 PM Haocheng Liu <lb...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a question on how to use the Acero push model to write streaming
>>> data as hive partitioning Parquet in a single thread program. Can anyone
>>> guide what's the best practice here and if my below understandings are
>>> correct:
>>>
>>>    - I receive streaming data via a callback function which gives me
>>>    data row by row. To my best knowledge, Subclassing RecordBatchReader is
>>>    preferred?
>>>    - Should I batch a fixed number rows in some in memory data
>>>    structure first, then flush them to acero? Then how could acero know it's
>>>    time to push data in ReadNext
>>>    <https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
>>>     function?
>>>
>>> I'm not clear on how to connect a call back function from streaming data
>>> with Aecro push model. Any suggestions will be appreciated.
>>>
>>>
>>> Thanks.
>>>
>>> Best,
>>> Haocheng
>>>
>>
>
> --
> Best regards
>
>

-- 
Best regards

Re: [C++][Parquet] Extend RcordBatchReader to write parquet from streaming data

Posted by Haocheng Liu <lb...@gmail.com>.
Hi Weston,

Thanks for the detailed reply! I will  explore the example you provided.

> -   You should block until sufficient data is available.
Can you plz elaborate on how to block in RecordBatchReader subclass?

Downstream nodes will only pull when Flush(...) is called?
```

 std::shared_ptr<arrow::RecordBatch> batch;516
ARROW_ASSIGN_OR_RAISE(batch, batch_builder->Flush());

```

Thanks in advance.

Regards,
Haocheng





On Fri, May 19, 2023 at 9:03 AM Weston Pace <we...@gmail.com> wrote:

> > Can anyone guide what's the best practice here and if my
> below understandings are correct:
>
> Here's an example:
> https://gist.github.com/westonpace/6f7fdbdc0399501418101851d75091c4
>
> > I receive streaming data via a callback function which gives me data row
> by row. To my best knowledge, Subclassing RecordBatchReader is preferred?
>
> RecordBatchReader is pull-based.  A friendly push-based source node for
> Acero would be a good idea, but doesn't exist.  You can make one using a
> PushGenerator but that might be complicated.  In the meantime, subclassing
> RecordBatchReader is probably simplest.
>
> > Should I batch a fixed number rows in some in memory data structure
> first, then flush them to acero?
>
> Yes, for performance.
>
> > Then how could acero know it's time to push data in ReadNext
> <https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
>  function?
>
> ReadNext is pull based.  Acero will continually call ReadNext.  You should
> block until sufficient data is available.
>
> > I have a question on how to use the Acero push model to write streaming
> data as hive partitioning Parquet in a single thread program
>
> The example I gave will use one thread in addition to the main thread.
> Doing something without any additional threads at all would be possible,
> but would probably require knowing more about the structure of your program.
>
> On Thu, May 18, 2023 at 3:27 PM Haocheng Liu <lb...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question on how to use the Acero push model to write streaming
>> data as hive partitioning Parquet in a single thread program. Can anyone
>> guide what's the best practice here and if my below understandings are
>> correct:
>>
>>    - I receive streaming data via a callback function which gives me
>>    data row by row. To my best knowledge, Subclassing RecordBatchReader is
>>    preferred?
>>    - Should I batch a fixed number rows in some in memory data structure
>>    first, then flush them to acero? Then how could acero know it's time to
>>    push data in ReadNext
>>    <https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
>>     function?
>>
>> I'm not clear on how to connect a call back function from streaming data
>> with Aecro push model. Any suggestions will be appreciated.
>>
>>
>> Thanks.
>>
>> Best,
>> Haocheng
>>
>

-- 
Best regards

Re: [C++][Parquet] Extend RcordBatchReader to write parquet from streaming data

Posted by Weston Pace <we...@gmail.com>.
> Can anyone guide what's the best practice here and if my
below understandings are correct:

Here's an example:
https://gist.github.com/westonpace/6f7fdbdc0399501418101851d75091c4

> I receive streaming data via a callback function which gives me data row
by row. To my best knowledge, Subclassing RecordBatchReader is preferred?

RecordBatchReader is pull-based.  A friendly push-based source node for
Acero would be a good idea, but doesn't exist.  You can make one using a
PushGenerator but that might be complicated.  In the meantime, subclassing
RecordBatchReader is probably simplest.

> Should I batch a fixed number rows in some in memory data structure
first, then flush them to acero?

Yes, for performance.

> Then how could acero know it's time to push data in ReadNext
<https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
 function?

ReadNext is pull based.  Acero will continually call ReadNext.  You should
block until sufficient data is available.

> I have a question on how to use the Acero push model to write streaming
data as hive partitioning Parquet in a single thread program

The example I gave will use one thread in addition to the main thread.
Doing something without any additional threads at all would be possible,
but would probably require knowing more about the structure of your program.

On Thu, May 18, 2023 at 3:27 PM Haocheng Liu <lb...@gmail.com> wrote:

> Hi,
>
> I have a question on how to use the Acero push model to write streaming
> data as hive partitioning Parquet in a single thread program. Can anyone
> guide what's the best practice here and if my below understandings are
> correct:
>
>    - I receive streaming data via a callback function which gives me data
>    row by row. To my best knowledge, Subclassing RecordBatchReader is
>    preferred?
>    - Should I batch a fixed number rows in some in memory data structure
>    first, then flush them to acero? Then how could acero know it's time to
>    push data in ReadNext
>    <https://arrow.apache.org/docs/cpp/api/table.html#_CPPv4N5arrow17RecordBatchReader8ReadNextEPNSt10shared_ptrI11RecordBatchEE>
>     function?
>
> I'm not clear on how to connect a call back function from streaming data
> with Aecro push model. Any suggestions will be appreciated.
>
>
> Thanks.
>
> Best,
> Haocheng
>