You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Frederic Branczyk <fb...@gmail.com> on 2021/12/03 12:46:45 UTC

Aggregating values by other columns

Hello,

First of all thank you so much for your work on Arrow, it looks like a very
promising piece of technology.

I'm very new to Arrow, and I'm trying to understand whether arrow is a good
fit for our use case (and if so, if you could maybe give us some pointers
as to which data structures might make sense). We happen to use Go, but I
would think that for the extent of my questions it should be language
agnostic.

We have a workload that works with data whose table looks pretty much like

+----------+----------+-----------+-------+
| SeriesID | EntityID | Timestamp | Value |
+----------+----------+-----------+-------+

Data is written by participants of the system by SeriesID, with a random,
unpredictable EntityID, and many values at the same time.

Queries to this data are typically filtering by a set of SeriesIDs and a
set of EntityIDs, as well as a certain time-frame and the remaining
datasets are added up and aggregated by EntityIDs, so that the result is
basically a map of EntityID to Value.

Maybe this influences the answer, since we are dealing with a lot of data,
our hope was that we could store the data in object storage and essentially
memory map it with multiple layers of caches from object storage to main
memory.

At first glance, Arrow looks like a great fit, but I'd love to hear your
thoughts as well as if a particular strategy or data structures come to
mind for a workload like this.

Best regards,
Frederic

Re: Aggregating values by other columns

Posted by Frederic Branczyk <fb...@gmail.com>.
Hello,

First of all, thank you very much for replying! I realize that while I
tried to describe the problem it was not in too much detail, so I
appreciate that you took the time to reply in such detail anyways!

We already know that the data is in the 100s of thousands of writes per
second, possibly going into the millions, and this data needs to be queried
at low latency. We have a very nice property that we can treat all of this
as an append-only/immutable workload, where updates never happen. Retention
of data might end up causing deletes, but the idea so far is to partition
data in blocks of time so we can just throw away sets of data for retention
purposes. Pre-aggregating data on different dimensions seems like an
option, or potentially storing the data in various layouts to optimize for
the different access patterns. But ultimately this will need experimenting
with arrow in place.

A requirement we have though is that we have to be able to do this at small
scale within a single process as well, so a large dependency is not going
to be workable for us.

I think something I'm still trying to wrap my head around is how data
filtering and subsequent computing is intended to work with arrow. I
realize this may also be dependent on the more concrete situation, but I'd
be interested in hearing how the mentioned platforms may be doing it.

The way I would approach it is:

* Load data in arrow format (potentially from parquet stored in object
storage)
* Filtering: This is one that I'm still most unsure about. Judging by
reading other arrow implementations that have filtering methods available
(it appears Go doesn't but they look simple enough, such as the rust
<https://docs.rs/arrow/latest/src/arrow/compute/kernels/filter.rs.html#250-279>
one), essentially I would worst case allocate O(2*n) in total, since worst
case the filtered data set is identical, or close to. Potentially while
filtering, lay out the data in a way that is most optimal to use as many
array operations as possible.
* Execute the array operations, and then use the result

Does this seem idiomatic or am I maybe missing some components or
strategies that are typical?

Best regards,
Frederic

PS: I also have some Go specific questions now that I've played a bit with
the library, but I'll create a separate thread on those.

On Fri, 3 Dec 2021 at 21:52, Weston Pace <we...@gmail.com> wrote:

> This is a good description of the problem but any data solution
> requires a lot of details and there will always be many ways to solve
> it.  Here is a smattering of advice.
>
> You have quite a bit of filtering.  How many rows total do you
> typically return from a query?  If you have millions / billions of
> rows but you are only ever querying 10-100 rows (pre-aggregation) at a
> time then traditional row-major RDBMs will probably work pretty well.
> Indices on the columns you are filtering will quickly identify the
> rows that need to be loaded and all the caching is going to be built
> in.
>
> On the other hand, if you need the interoperability of Arrow, or you
> are going to be querying large result sets, then using column-based
> storage backed by Arrow sounds like a good idea.
>
> You will probably find you want to partition your data by some of the
> columns you are querying.  For example, you could partition it by
> SeriesID and Timestamp or by all three columns or just one column.
> The smaller the partition the more precise your queries can be when
> selecting what data to load.  However, the smaller the partition the
> more overhead you are going to have (more files, less effective I/O /
> prefetch, etc.)  So the appropriate amount is going to depend on your
> data and your queries.
>
> Depending on your partitioning you may also struggle with updates.  If
> you get a lot of updates that divide into many small batches once
> partitioned then you will end up with lots of tiny files (not a good
> thing).  So then you will probably need some kind of periodic batching
> together of files.
>
> You'll want some kind of tool to do the filtering & compute work, both
> pushdown filtering (using the file metadata & directory names to
> select which data to load) and post-filtering (removing rows that you
> don't want but couldn't identify through metadata alone).  Some of the
> implementations have this builtin and others don't.  I don't know Go
> well enough to say for certain where it falls.  For the ones that
> don't there are query engines out there you can use and you can
> communicate with them via flight, shared IPC files, the C data
> interface, or any number of ways (might want to check what Go
> supports).
>
> So I think my general advice is that what you are describing is
> probably a great fit for Arrow, but it's going to be a fair amount of
> work.  There are lots of solutions out there that build on Arrow and
> will do some parts of this work for you.  For example, datafusion,
> duckdb, iceberg, nessie, etc.  I don't know that this mailing list
> will be able to provide comprehensive advice on the entire ecosystem
> of tools out there.
>
> On Fri, Dec 3, 2021 at 2:47 AM Frederic Branczyk <fb...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > First of all thank you so much for your work on Arrow, it looks like a
> very promising piece of technology.
> >
> > I'm very new to Arrow, and I'm trying to understand whether arrow is a
> good fit for our use case (and if so, if you could maybe give us some
> pointers as to which data structures might make sense). We happen to use
> Go, but I would think that for the extent of my questions it should be
> language agnostic.
> >
> > We have a workload that works with data whose table looks pretty much
> like
> >
> > +----------+----------+-----------+-------+
> > | SeriesID | EntityID | Timestamp | Value |
> > +----------+----------+-----------+-------+
> >
> > Data is written by participants of the system by SeriesID, with a
> random, unpredictable EntityID, and many values at the same time.
> >
> > Queries to this data are typically filtering by a set of SeriesIDs and a
> set of EntityIDs, as well as a certain time-frame and the remaining
> datasets are added up and aggregated by EntityIDs, so that the result is
> basically a map of EntityID to Value.
> >
> > Maybe this influences the answer, since we are dealing with a lot of
> data, our hope was that we could store the data in object storage and
> essentially memory map it with multiple layers of caches from object
> storage to main memory.
> >
> > At first glance, Arrow looks like a great fit, but I'd love to hear your
> thoughts as well as if a particular strategy or data structures come to
> mind for a workload like this.
> >
> > Best regards,
> > Frederic
>

Re: Aggregating values by other columns

Posted by Weston Pace <we...@gmail.com>.
This is a good description of the problem but any data solution
requires a lot of details and there will always be many ways to solve
it.  Here is a smattering of advice.

You have quite a bit of filtering.  How many rows total do you
typically return from a query?  If you have millions / billions of
rows but you are only ever querying 10-100 rows (pre-aggregation) at a
time then traditional row-major RDBMs will probably work pretty well.
Indices on the columns you are filtering will quickly identify the
rows that need to be loaded and all the caching is going to be built
in.

On the other hand, if you need the interoperability of Arrow, or you
are going to be querying large result sets, then using column-based
storage backed by Arrow sounds like a good idea.

You will probably find you want to partition your data by some of the
columns you are querying.  For example, you could partition it by
SeriesID and Timestamp or by all three columns or just one column.
The smaller the partition the more precise your queries can be when
selecting what data to load.  However, the smaller the partition the
more overhead you are going to have (more files, less effective I/O /
prefetch, etc.)  So the appropriate amount is going to depend on your
data and your queries.

Depending on your partitioning you may also struggle with updates.  If
you get a lot of updates that divide into many small batches once
partitioned then you will end up with lots of tiny files (not a good
thing).  So then you will probably need some kind of periodic batching
together of files.

You'll want some kind of tool to do the filtering & compute work, both
pushdown filtering (using the file metadata & directory names to
select which data to load) and post-filtering (removing rows that you
don't want but couldn't identify through metadata alone).  Some of the
implementations have this builtin and others don't.  I don't know Go
well enough to say for certain where it falls.  For the ones that
don't there are query engines out there you can use and you can
communicate with them via flight, shared IPC files, the C data
interface, or any number of ways (might want to check what Go
supports).

So I think my general advice is that what you are describing is
probably a great fit for Arrow, but it's going to be a fair amount of
work.  There are lots of solutions out there that build on Arrow and
will do some parts of this work for you.  For example, datafusion,
duckdb, iceberg, nessie, etc.  I don't know that this mailing list
will be able to provide comprehensive advice on the entire ecosystem
of tools out there.

On Fri, Dec 3, 2021 at 2:47 AM Frederic Branczyk <fb...@gmail.com> wrote:
>
> Hello,
>
> First of all thank you so much for your work on Arrow, it looks like a very promising piece of technology.
>
> I'm very new to Arrow, and I'm trying to understand whether arrow is a good fit for our use case (and if so, if you could maybe give us some pointers as to which data structures might make sense). We happen to use Go, but I would think that for the extent of my questions it should be language agnostic.
>
> We have a workload that works with data whose table looks pretty much like
>
> +----------+----------+-----------+-------+
> | SeriesID | EntityID | Timestamp | Value |
> +----------+----------+-----------+-------+
>
> Data is written by participants of the system by SeriesID, with a random, unpredictable EntityID, and many values at the same time.
>
> Queries to this data are typically filtering by a set of SeriesIDs and a set of EntityIDs, as well as a certain time-frame and the remaining datasets are added up and aggregated by EntityIDs, so that the result is basically a map of EntityID to Value.
>
> Maybe this influences the answer, since we are dealing with a lot of data, our hope was that we could store the data in object storage and essentially memory map it with multiple layers of caches from object storage to main memory.
>
> At first glance, Arrow looks like a great fit, but I'd love to hear your thoughts as well as if a particular strategy or data structures come to mind for a workload like this.
>
> Best regards,
> Frederic