You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Chris Osborn <cs...@gmail.com> on 2020/06/25 18:05:22 UTC

Arrow for low-latency streaming of small batches?

Hi,

I am investigating Arrow for a project that needs to transfer records from
a producer to one or more consumers in small batches (median batch size is
1) and with low latency. The usual structure for something like this would
be a single producer multi-consumer queue*. Is there any sane way to use
Arrow in this fashion? I have a little C++ prototype that works, but it
does the following for each batch of rows:

Producer side:
    1. construct a set of builders
    2. append a value to each builder for each record in the batch
    3. finish the builders and use them to make a RecordBatch
    4. append the RecordBatch to a vector

Consumer side:
    1. construct a Table from the vector of RecordBatches
    2. slice out the part of the table that the consumer requires (each
consumer keeps its own offset)
    3. read the data from the resulting sliced table

Considering how much work this has to do it performs better than I would
have expected, but there's definitely a big fixed cost for each batch of
rows (constructing and destructing builders, making Tables that can only be
used once since they're immutable, etc). If the batches weren't so small it
would probably make sense, but as is it's unworkable. I need to add rows to
logical "tables" thousands of times per second in aggregate.

Am I just too far from Arrow's big data sweet spot, or is there something
I'm missing? I keep reading about IPC and streaming of Arrow data, but I
can't find a way to use it at such fine granularity. Thanks in advance for
any insights!

Thanks!
Chris Osborn


* yes, I can just use a queue, but the promise of a uniform memory layout
that is simultaneously accessible to C++ and Python is very compelling

Re: Arrow for low-latency streaming of small batches?

Posted by Christian Hudon <ch...@elementai.com>.
Very interesting. This is something that I would potentially also be
interested in, so if there were some code available out there, I could
potentially contribute or at least use. At least, I'd love for something
that allows Arrow to work with both larger and very small record batches (a
few rows) in a seamless and efficient way to make it into the Arrow
codebase.

Le lun. 29 juin 2020, à 17 h 05, Wes McKinney <we...@gmail.com> a
écrit :

> On Fri, Jun 26, 2020 at 8:56 AM Chris Osborn <cs...@fb.com.invalid>
> wrote:
> >
> > Yes, it would be quite feasible to preallocate a region large enough for
> several thousand rows for each column, assuming I read from that region
> while it's still filling in. When that region is full, I could either
> allocate a new big chunk or loop around if I no longer need the data. I'm
> now doing something like that in a revised prototype. Specifically I'm
> creating builders and calling Reserve() once up front to get a large
> region, which I then fill in with multiple batches. As the producer fills
> it in using ArrayBuilder::Append(), the consumers read out earlier rows
> using ArrayBuilder::GetValue(). This works, but I'm clearly going against
> the spirit of the library by using builders as ersatz Arrays and a set of
> builders in lieu of a Table.
> >
> > In short, it's feasible (and preferable) to preallocate the memory
> needed, whether it's the builders' memory or the RecordBatch/Table's memory
> (ideally that's the same thing?). I just haven't been able to figure out
> how to do that gracefully.
>
> By following the columnar format's buffer layouts [1] it should
> straightforward to compute the size of a memory region to preallocate
> that represents a RecordBatch's memory and then construct the Buffer
> and ArrayData objects that reference each constituent buffer, and then
> create a RecordBatch from those ArrayData objects. Some assumptions
> must be made of course:
>
> * If a field is nullable, then an empty validity bitmap must be
> preallocated (and you can initialize it to all valid or all null based
> on what your application prefers)
> * Must decide what to do about variable-size allocations for
> binary/string types (and extrapolating, analogously for list types if
> you have Array/List-like data). So if you preallocated a region that
> can accommodate 1024 values then you might allocate 32KB data buffers
> for string data (or some factor of the length if you have bigger
> strings). If you fill up the data buffer then you will have to move on
> to the next region. Another approach might be to let the string data
> buffer be a separate ResizableBuffer that you reallocate when you need
> to make it bigger
>
> I could envision creating a C++ implementation to manage this whole
> process that becomes a part of the Arrow C++ codebase -- preallocate
> memory given some global / field-level options and then provide
> effectively "UnsafeAppend" APIs to write data into the preallocated
> region.
>
> If you create a "parent" RecordBatch that references the preallocated
> memory than you can use `RecordBatch::Slice` to "chop" off the filled
> portion to pass to your consumer.
>
> [1]:
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#buffer-listing-for-each-layout
>
> > Thanks!
> > Chris Osborn
> >
> > ________________________________
> > From: Wes McKinney <we...@gmail.com>
> > Sent: Thursday, June 25, 2020 10:13 PM
> > To: dev <de...@arrow.apache.org>
> > Subject: Re: Arrow for low-latency streaming of small batches?
> >
> > Is it feasible to preallocate the memory region where you are writing
> > the record batch?
> >
> > On Thu, Jun 25, 2020 at 1:06 PM Chris Osborn <cs...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > I am investigating Arrow for a project that needs to transfer records
> from
> > > a producer to one or more consumers in small batches (median batch
> size is
> > > 1) and with low latency. The usual structure for something like this
> would
> > > be a single producer multi-consumer queue*. Is there any sane way to
> use
> > > Arrow in this fashion? I have a little C++ prototype that works, but it
> > > does the following for each batch of rows:
> > >
> > > Producer side:
> > >     1. construct a set of builders
> > >     2. append a value to each builder for each record in the batch
> > >     3. finish the builders and use them to make a RecordBatch
> > >     4. append the RecordBatch to a vector
> > >
> > > Consumer side:
> > >     1. construct a Table from the vector of RecordBatches
> > >     2. slice out the part of the table that the consumer requires (each
> > > consumer keeps its own offset)
> > >     3. read the data from the resulting sliced table
> > >
> > > Considering how much work this has to do it performs better than I
> would
> > > have expected, but there's definitely a big fixed cost for each batch
> of
> > > rows (constructing and destructing builders, making Tables that can
> only be
> > > used once since they're immutable, etc). If the batches weren't so
> small it
> > > would probably make sense, but as is it's unworkable. I need to add
> rows to
> > > logical "tables" thousands of times per second in aggregate.
> > >
> > > Am I just too far from Arrow's big data sweet spot, or is there
> something
> > > I'm missing? I keep reading about IPC and streaming of Arrow data, but
> I
> > > can't find a way to use it at such fine granularity. Thanks in advance
> for
> > > any insights!
> > >
> > > Thanks!
> > > Chris Osborn
> > >
> > >
> > > * yes, I can just use a queue, but the promise of a uniform memory
> layout
> > > that is simultaneously accessible to C++ and Python is very compelling
>


-- 


│ Christian Hudon

│ Applied Research Scientist

   Element AI, 6650 Saint-Urbain #500

   Montréal, QC, H2S 3G9, Canada
   Elementai.com

Re: Arrow for low-latency streaming of small batches?

Posted by Wes McKinney <we...@gmail.com>.
On Fri, Jun 26, 2020 at 8:56 AM Chris Osborn <cs...@fb.com.invalid> wrote:
>
> Yes, it would be quite feasible to preallocate a region large enough for several thousand rows for each column, assuming I read from that region while it's still filling in. When that region is full, I could either allocate a new big chunk or loop around if I no longer need the data. I'm now doing something like that in a revised prototype. Specifically I'm creating builders and calling Reserve() once up front to get a large region, which I then fill in with multiple batches. As the producer fills it in using ArrayBuilder::Append(), the consumers read out earlier rows using ArrayBuilder::GetValue(). This works, but I'm clearly going against the spirit of the library by using builders as ersatz Arrays and a set of builders in lieu of a Table.
>
> In short, it's feasible (and preferable) to preallocate the memory needed, whether it's the builders' memory or the RecordBatch/Table's memory (ideally that's the same thing?). I just haven't been able to figure out how to do that gracefully.

By following the columnar format's buffer layouts [1] it should
straightforward to compute the size of a memory region to preallocate
that represents a RecordBatch's memory and then construct the Buffer
and ArrayData objects that reference each constituent buffer, and then
create a RecordBatch from those ArrayData objects. Some assumptions
must be made of course:

* If a field is nullable, then an empty validity bitmap must be
preallocated (and you can initialize it to all valid or all null based
on what your application prefers)
* Must decide what to do about variable-size allocations for
binary/string types (and extrapolating, analogously for list types if
you have Array/List-like data). So if you preallocated a region that
can accommodate 1024 values then you might allocate 32KB data buffers
for string data (or some factor of the length if you have bigger
strings). If you fill up the data buffer then you will have to move on
to the next region. Another approach might be to let the string data
buffer be a separate ResizableBuffer that you reallocate when you need
to make it bigger

I could envision creating a C++ implementation to manage this whole
process that becomes a part of the Arrow C++ codebase -- preallocate
memory given some global / field-level options and then provide
effectively "UnsafeAppend" APIs to write data into the preallocated
region.

If you create a "parent" RecordBatch that references the preallocated
memory than you can use `RecordBatch::Slice` to "chop" off the filled
portion to pass to your consumer.

[1]: https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#buffer-listing-for-each-layout

> Thanks!
> Chris Osborn
>
> ________________________________
> From: Wes McKinney <we...@gmail.com>
> Sent: Thursday, June 25, 2020 10:13 PM
> To: dev <de...@arrow.apache.org>
> Subject: Re: Arrow for low-latency streaming of small batches?
>
> Is it feasible to preallocate the memory region where you are writing
> the record batch?
>
> On Thu, Jun 25, 2020 at 1:06 PM Chris Osborn <cs...@gmail.com> wrote:
> >
> > Hi,
> >
> > I am investigating Arrow for a project that needs to transfer records from
> > a producer to one or more consumers in small batches (median batch size is
> > 1) and with low latency. The usual structure for something like this would
> > be a single producer multi-consumer queue*. Is there any sane way to use
> > Arrow in this fashion? I have a little C++ prototype that works, but it
> > does the following for each batch of rows:
> >
> > Producer side:
> >     1. construct a set of builders
> >     2. append a value to each builder for each record in the batch
> >     3. finish the builders and use them to make a RecordBatch
> >     4. append the RecordBatch to a vector
> >
> > Consumer side:
> >     1. construct a Table from the vector of RecordBatches
> >     2. slice out the part of the table that the consumer requires (each
> > consumer keeps its own offset)
> >     3. read the data from the resulting sliced table
> >
> > Considering how much work this has to do it performs better than I would
> > have expected, but there's definitely a big fixed cost for each batch of
> > rows (constructing and destructing builders, making Tables that can only be
> > used once since they're immutable, etc). If the batches weren't so small it
> > would probably make sense, but as is it's unworkable. I need to add rows to
> > logical "tables" thousands of times per second in aggregate.
> >
> > Am I just too far from Arrow's big data sweet spot, or is there something
> > I'm missing? I keep reading about IPC and streaming of Arrow data, but I
> > can't find a way to use it at such fine granularity. Thanks in advance for
> > any insights!
> >
> > Thanks!
> > Chris Osborn
> >
> >
> > * yes, I can just use a queue, but the promise of a uniform memory layout
> > that is simultaneously accessible to C++ and Python is very compelling

Re: Arrow for low-latency streaming of small batches?

Posted by Chris Osborn <cs...@fb.com.INVALID>.
Yes, it would be quite feasible to preallocate a region large enough for several thousand rows for each column, assuming I read from that region while it's still filling in. When that region is full, I could either allocate a new big chunk or loop around if I no longer need the data. I'm now doing something like that in a revised prototype. Specifically I'm creating builders and calling Reserve() once up front to get a large region, which I then fill in with multiple batches. As the producer fills it in using ArrayBuilder::Append(), the consumers read out earlier rows using ArrayBuilder::GetValue(). This works, but I'm clearly going against the spirit of the library by using builders as ersatz Arrays and a set of builders in lieu of a Table.

In short, it's feasible (and preferable) to preallocate the memory needed, whether it's the builders' memory or the RecordBatch/Table's memory (ideally that's the same thing?). I just haven't been able to figure out how to do that gracefully.

Thanks!
Chris Osborn

________________________________
From: Wes McKinney <we...@gmail.com>
Sent: Thursday, June 25, 2020 10:13 PM
To: dev <de...@arrow.apache.org>
Subject: Re: Arrow for low-latency streaming of small batches?

Is it feasible to preallocate the memory region where you are writing
the record batch?

On Thu, Jun 25, 2020 at 1:06 PM Chris Osborn <cs...@gmail.com> wrote:
>
> Hi,
>
> I am investigating Arrow for a project that needs to transfer records from
> a producer to one or more consumers in small batches (median batch size is
> 1) and with low latency. The usual structure for something like this would
> be a single producer multi-consumer queue*. Is there any sane way to use
> Arrow in this fashion? I have a little C++ prototype that works, but it
> does the following for each batch of rows:
>
> Producer side:
>     1. construct a set of builders
>     2. append a value to each builder for each record in the batch
>     3. finish the builders and use them to make a RecordBatch
>     4. append the RecordBatch to a vector
>
> Consumer side:
>     1. construct a Table from the vector of RecordBatches
>     2. slice out the part of the table that the consumer requires (each
> consumer keeps its own offset)
>     3. read the data from the resulting sliced table
>
> Considering how much work this has to do it performs better than I would
> have expected, but there's definitely a big fixed cost for each batch of
> rows (constructing and destructing builders, making Tables that can only be
> used once since they're immutable, etc). If the batches weren't so small it
> would probably make sense, but as is it's unworkable. I need to add rows to
> logical "tables" thousands of times per second in aggregate.
>
> Am I just too far from Arrow's big data sweet spot, or is there something
> I'm missing? I keep reading about IPC and streaming of Arrow data, but I
> can't find a way to use it at such fine granularity. Thanks in advance for
> any insights!
>
> Thanks!
> Chris Osborn
>
>
> * yes, I can just use a queue, but the promise of a uniform memory layout
> that is simultaneously accessible to C++ and Python is very compelling

Re: Arrow for low-latency streaming of small batches?

Posted by Wes McKinney <we...@gmail.com>.
Is it feasible to preallocate the memory region where you are writing
the record batch?

On Thu, Jun 25, 2020 at 1:06 PM Chris Osborn <cs...@gmail.com> wrote:
>
> Hi,
>
> I am investigating Arrow for a project that needs to transfer records from
> a producer to one or more consumers in small batches (median batch size is
> 1) and with low latency. The usual structure for something like this would
> be a single producer multi-consumer queue*. Is there any sane way to use
> Arrow in this fashion? I have a little C++ prototype that works, but it
> does the following for each batch of rows:
>
> Producer side:
>     1. construct a set of builders
>     2. append a value to each builder for each record in the batch
>     3. finish the builders and use them to make a RecordBatch
>     4. append the RecordBatch to a vector
>
> Consumer side:
>     1. construct a Table from the vector of RecordBatches
>     2. slice out the part of the table that the consumer requires (each
> consumer keeps its own offset)
>     3. read the data from the resulting sliced table
>
> Considering how much work this has to do it performs better than I would
> have expected, but there's definitely a big fixed cost for each batch of
> rows (constructing and destructing builders, making Tables that can only be
> used once since they're immutable, etc). If the batches weren't so small it
> would probably make sense, but as is it's unworkable. I need to add rows to
> logical "tables" thousands of times per second in aggregate.
>
> Am I just too far from Arrow's big data sweet spot, or is there something
> I'm missing? I keep reading about IPC and streaming of Arrow data, but I
> can't find a way to use it at such fine granularity. Thanks in advance for
> any insights!
>
> Thanks!
> Chris Osborn
>
>
> * yes, I can just use a queue, but the promise of a uniform memory layout
> that is simultaneously accessible to C++ and Python is very compelling