You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@arrow.apache.org by Li Jin <ic...@gmail.com> on 2022/01/11 21:19:02 UTC

Arrow streaming computation engine

Hi,

This is a somewhat lengthy email about thoughts around a streaming
computation engine for Arrow dataset that I would like to hear feedback
from Arrow devs.

The main use cases that we are thinking for the streaming engine are time
series data, i.e., data arrives in time order (e.g. daily US stock prices)
and the query often follows the time order of the data (e.g., compute 7 day
rolling mean of daily US stock prices).

The main motivations for a streaming engine is (1) performance: always
keeps small amount of hot data always in memory and cache (2)
memory efficiency: the engine only need to keep small amounts of data in
memory, e.g., for the 7 day rolling mean case, the engine never need to
keep more than 7 day worth of stock price data, even it is computing this
for a stream of 20 year data. (3) Live data application: data arrives in
real time

I have talked to Phillip Cloud and am aware of an effort going on to build
a computation engine for SQL-like queries (mostly query on the entire
dataset) but am unfamiliar with the details. So I am wondering if there is
a way to design an engine that can satisfy both streaming and batch mode of
processing. Or maybe it needs to be seperate engines but we can minimize
the amount of duplication?

Looking forward to any thoughts around this.

Li

Re: Arrow streaming computation engine

Posted by QP Hou <ho...@gmail.com>.
For datafusion (the Rust engine that Weston mentioned), the community
is about to start building a PoC for streaming engine. The discussion
is happening at
https://github.com/apache/arrow-datafusion/issues/1544.

On Tue, Jan 11, 2022 at 3:29 PM Weston Pace <we...@gmail.com> wrote:
>
> First, note that there are different computation engines in different
> languages.  The Rust implementation has datafusion[1] for example.
> For the rest of this email, I will speak in more detail specifically
> about the C++ computation engine (which I am more familiar with) that
> is in place today.  The C++ engine is documented here[2] although that
> documentation is a little scarce and we are working on an updated
> version[3].
>
> Also note that the docs describe a "Streaming execution engine"
> because it operates on the data in a batch-oriented fashion.  However,
> this doesn't guarantee that it will use a small amount of memory.  For
> example, if you were to request that the engine sort the data then the
> engine may need to cache the entire dataset into memory (in the future
> this may mean spilling into temporary tables as memory runs out) in
> order to fulfill that query (because the very last row you read might
> be the very first row you need to emit).  However, for properly
> constructed queries, the engine should be able to operate as you are
> describing.  The queries you are describing sound to me like what one
> might expect to find in a "time series database" which is another term
> I've heard thrown around.
>
> I am not an expert in time series databases so I don't know the extent
> of the computation required.  However, the example you give (7 day
> rolling mean of daily US stock prices) is not something that could be
> efficiently computed today.  It is something that could be efficiently
> computed once "window functions" are supported.  Window functions[4]
> are a query engine feature that enables the sliding window needed for
> a rolling average.  I believe there are people at Voltron Data that
> are hoping to add support for these window functions to the C++
> streaming execution engine but that is future work that is not
> currently in progress.  That being said, a time series execution
> engine would probably also need to know about indices, statistics,
> whether the data on disk is sorted or not (and by what columns),
> downsampling functions, interpolation functions, etc.  In addition,
> beyond execution / computation there are concerns such as retention
> policies, streaming / appending data to disk, etc.
>
> > So I am wondering if there is
> > a way to design an engine that can satisfy both streaming and batch mode of
> > processing. Or maybe it needs to be seperate engines but we can minimize
> > the amount of duplication?
>
> Regardless of the timeline and plans for window functions the answer
> to this specific question is probably "yes" but I'm not enough of an
> expert in time series processing to answer with certainty.  The
> streaming execution engine in Arrow today is quite generic.  A graph
> of "exec nodes" is constructed.  Data is passed through these exec
> nodes starting from one or more sources and then ending at a sink.
> The sources could be live data to satisfy your request for (3).  The
> plan is currently run very similar to an actor model where batches are
> pushed from one node to another.  I'm hoping to add more support for
> scheduling and backpressure at some point.  Given what I know of the
> types of queries you are describing I think this model should suffice
> to run those queries efficiently.
>
> So, summarizing, I think some of the work we are doing will be useful
> to you (though possibly not sufficient) and it would be a good idea to
> reuse & share where possible.
>
> [1] https://docs.rs/datafusion/latest/datafusion/
> [2] https://arrow.apache.org/docs/cpp/streaming_execution.html
> [3] https://github.com/apache/arrow/pull/12033
> [4] https://medium.com/an-idea/rolling-sum-and-average-window-functions-mysql-7509d1d576e6
>
> On Tue, Jan 11, 2022 at 11:19 AM Li Jin <ic...@gmail.com> wrote:
> >
> > Hi,
> >
> > This is a somewhat lengthy email about thoughts around a streaming
> > computation engine for Arrow dataset that I would like to hear feedback
> > from Arrow devs.
> >
> > The main use cases that we are thinking for the streaming engine are time
> > series data, i.e., data arrives in time order (e.g. daily US stock prices)
> > and the query often follows the time order of the data (e.g., compute 7 day
> > rolling mean of daily US stock prices).
> >
> > The main motivations for a streaming engine is (1) performance: always
> > keeps small amount of hot data always in memory and cache (2)
> > memory efficiency: the engine only need to keep small amounts of data in
> > memory, e.g., for the 7 day rolling mean case, the engine never need to
> > keep more than 7 day worth of stock price data, even it is computing this
> > for a stream of 20 year data. (3) Live data application: data arrives in
> > real time
> >
> > I have talked to Phillip Cloud and am aware of an effort going on to build
> > a computation engine for SQL-like queries (mostly query on the entire
> > dataset) but am unfamiliar with the details. So I am wondering if there is
> > a way to design an engine that can satisfy both streaming and batch mode of
> > processing. Or maybe it needs to be seperate engines but we can minimize
> > the amount of duplication?
> >
> > Looking forward to any thoughts around this.
> >
> > Li

Re: Arrow streaming computation engine

Posted by Weston Pace <we...@gmail.com>.
> Would appreciate it if you can give some pointers to how to
> start playing with that code.

I have a (somewhat) minimal example here:
https://gist.github.com/westonpace/e555a3b1c269c31de7176d34f47a2fb0
The PR I mentioned earlier
(https://github.com/apache/arrow/pull/12033) has more examples (thanks
Vibhatha).

On Wed, Jan 12, 2022 at 10:11 AM Li Jin <ic...@gmail.com> wrote:
>
> Weston - Thanks for the pointer. The C++ streaming engine you pointed out
> is a lot like what I have in mind. Will take a close look at that. Would
> appreciate it if you can give some pointers to how to start playing with
> that code.
>
> Hou - Glad to hear that the DataFusion community has similar ideas. Look
> forward to exchanging more ideas.
>
> Thanks!
> Li
>
>
> On Tue, Jan 11, 2022 at 6:22 PM Weston Pace <we...@gmail.com> wrote:
>
> > First, note that there are different computation engines in different
> > languages.  The Rust implementation has datafusion[1] for example.
> > For the rest of this email, I will speak in more detail specifically
> > about the C++ computation engine (which I am more familiar with) that
> > is in place today.  The C++ engine is documented here[2] although that
> > documentation is a little scarce and we are working on an updated
> > version[3].
> >
> > Also note that the docs describe a "Streaming execution engine"
> > because it operates on the data in a batch-oriented fashion.  However,
> > this doesn't guarantee that it will use a small amount of memory.  For
> > example, if you were to request that the engine sort the data then the
> > engine may need to cache the entire dataset into memory (in the future
> > this may mean spilling into temporary tables as memory runs out) in
> > order to fulfill that query (because the very last row you read might
> > be the very first row you need to emit).  However, for properly
> > constructed queries, the engine should be able to operate as you are
> > describing.  The queries you are describing sound to me like what one
> > might expect to find in a "time series database" which is another term
> > I've heard thrown around.
> >
> > I am not an expert in time series databases so I don't know the extent
> > of the computation required.  However, the example you give (7 day
> > rolling mean of daily US stock prices) is not something that could be
> > efficiently computed today.  It is something that could be efficiently
> > computed once "window functions" are supported.  Window functions[4]
> > are a query engine feature that enables the sliding window needed for
> > a rolling average.  I believe there are people at Voltron Data that
> > are hoping to add support for these window functions to the C++
> > streaming execution engine but that is future work that is not
> > currently in progress.  That being said, a time series execution
> > engine would probably also need to know about indices, statistics,
> > whether the data on disk is sorted or not (and by what columns),
> > downsampling functions, interpolation functions, etc.  In addition,
> > beyond execution / computation there are concerns such as retention
> > policies, streaming / appending data to disk, etc.
> >
> > > So I am wondering if there is
> > > a way to design an engine that can satisfy both streaming and batch mode
> > of
> > > processing. Or maybe it needs to be seperate engines but we can minimize
> > > the amount of duplication?
> >
> > Regardless of the timeline and plans for window functions the answer
> > to this specific question is probably "yes" but I'm not enough of an
> > expert in time series processing to answer with certainty.  The
> > streaming execution engine in Arrow today is quite generic.  A graph
> > of "exec nodes" is constructed.  Data is passed through these exec
> > nodes starting from one or more sources and then ending at a sink.
> > The sources could be live data to satisfy your request for (3).  The
> > plan is currently run very similar to an actor model where batches are
> > pushed from one node to another.  I'm hoping to add more support for
> > scheduling and backpressure at some point.  Given what I know of the
> > types of queries you are describing I think this model should suffice
> > to run those queries efficiently.
> >
> > So, summarizing, I think some of the work we are doing will be useful
> > to you (though possibly not sufficient) and it would be a good idea to
> > reuse & share where possible.
> >
> > [1] https://docs.rs/datafusion/latest/datafusion/
> > [2] https://arrow.apache.org/docs/cpp/streaming_execution.html
> > [3] https://github.com/apache/arrow/pull/12033
> > [4]
> > https://medium.com/an-idea/rolling-sum-and-average-window-functions-mysql-7509d1d576e6
> >
> > On Tue, Jan 11, 2022 at 11:19 AM Li Jin <ic...@gmail.com> wrote:
> > >
> > > Hi,
> > >
> > > This is a somewhat lengthy email about thoughts around a streaming
> > > computation engine for Arrow dataset that I would like to hear feedback
> > > from Arrow devs.
> > >
> > > The main use cases that we are thinking for the streaming engine are time
> > > series data, i.e., data arrives in time order (e.g. daily US stock
> > prices)
> > > and the query often follows the time order of the data (e.g., compute 7
> > day
> > > rolling mean of daily US stock prices).
> > >
> > > The main motivations for a streaming engine is (1) performance: always
> > > keeps small amount of hot data always in memory and cache (2)
> > > memory efficiency: the engine only need to keep small amounts of data in
> > > memory, e.g., for the 7 day rolling mean case, the engine never need to
> > > keep more than 7 day worth of stock price data, even it is computing this
> > > for a stream of 20 year data. (3) Live data application: data arrives in
> > > real time
> > >
> > > I have talked to Phillip Cloud and am aware of an effort going on to
> > build
> > > a computation engine for SQL-like queries (mostly query on the entire
> > > dataset) but am unfamiliar with the details. So I am wondering if there
> > is
> > > a way to design an engine that can satisfy both streaming and batch mode
> > of
> > > processing. Or maybe it needs to be seperate engines but we can minimize
> > > the amount of duplication?
> > >
> > > Looking forward to any thoughts around this.
> > >
> > > Li
> >

Re: Arrow streaming computation engine

Posted by Li Jin <ic...@gmail.com>.
Weston - Thanks for the pointer. The C++ streaming engine you pointed out
is a lot like what I have in mind. Will take a close look at that. Would
appreciate it if you can give some pointers to how to start playing with
that code.

Hou - Glad to hear that the DataFusion community has similar ideas. Look
forward to exchanging more ideas.

Thanks!
Li


On Tue, Jan 11, 2022 at 6:22 PM Weston Pace <we...@gmail.com> wrote:

> First, note that there are different computation engines in different
> languages.  The Rust implementation has datafusion[1] for example.
> For the rest of this email, I will speak in more detail specifically
> about the C++ computation engine (which I am more familiar with) that
> is in place today.  The C++ engine is documented here[2] although that
> documentation is a little scarce and we are working on an updated
> version[3].
>
> Also note that the docs describe a "Streaming execution engine"
> because it operates on the data in a batch-oriented fashion.  However,
> this doesn't guarantee that it will use a small amount of memory.  For
> example, if you were to request that the engine sort the data then the
> engine may need to cache the entire dataset into memory (in the future
> this may mean spilling into temporary tables as memory runs out) in
> order to fulfill that query (because the very last row you read might
> be the very first row you need to emit).  However, for properly
> constructed queries, the engine should be able to operate as you are
> describing.  The queries you are describing sound to me like what one
> might expect to find in a "time series database" which is another term
> I've heard thrown around.
>
> I am not an expert in time series databases so I don't know the extent
> of the computation required.  However, the example you give (7 day
> rolling mean of daily US stock prices) is not something that could be
> efficiently computed today.  It is something that could be efficiently
> computed once "window functions" are supported.  Window functions[4]
> are a query engine feature that enables the sliding window needed for
> a rolling average.  I believe there are people at Voltron Data that
> are hoping to add support for these window functions to the C++
> streaming execution engine but that is future work that is not
> currently in progress.  That being said, a time series execution
> engine would probably also need to know about indices, statistics,
> whether the data on disk is sorted or not (and by what columns),
> downsampling functions, interpolation functions, etc.  In addition,
> beyond execution / computation there are concerns such as retention
> policies, streaming / appending data to disk, etc.
>
> > So I am wondering if there is
> > a way to design an engine that can satisfy both streaming and batch mode
> of
> > processing. Or maybe it needs to be seperate engines but we can minimize
> > the amount of duplication?
>
> Regardless of the timeline and plans for window functions the answer
> to this specific question is probably "yes" but I'm not enough of an
> expert in time series processing to answer with certainty.  The
> streaming execution engine in Arrow today is quite generic.  A graph
> of "exec nodes" is constructed.  Data is passed through these exec
> nodes starting from one or more sources and then ending at a sink.
> The sources could be live data to satisfy your request for (3).  The
> plan is currently run very similar to an actor model where batches are
> pushed from one node to another.  I'm hoping to add more support for
> scheduling and backpressure at some point.  Given what I know of the
> types of queries you are describing I think this model should suffice
> to run those queries efficiently.
>
> So, summarizing, I think some of the work we are doing will be useful
> to you (though possibly not sufficient) and it would be a good idea to
> reuse & share where possible.
>
> [1] https://docs.rs/datafusion/latest/datafusion/
> [2] https://arrow.apache.org/docs/cpp/streaming_execution.html
> [3] https://github.com/apache/arrow/pull/12033
> [4]
> https://medium.com/an-idea/rolling-sum-and-average-window-functions-mysql-7509d1d576e6
>
> On Tue, Jan 11, 2022 at 11:19 AM Li Jin <ic...@gmail.com> wrote:
> >
> > Hi,
> >
> > This is a somewhat lengthy email about thoughts around a streaming
> > computation engine for Arrow dataset that I would like to hear feedback
> > from Arrow devs.
> >
> > The main use cases that we are thinking for the streaming engine are time
> > series data, i.e., data arrives in time order (e.g. daily US stock
> prices)
> > and the query often follows the time order of the data (e.g., compute 7
> day
> > rolling mean of daily US stock prices).
> >
> > The main motivations for a streaming engine is (1) performance: always
> > keeps small amount of hot data always in memory and cache (2)
> > memory efficiency: the engine only need to keep small amounts of data in
> > memory, e.g., for the 7 day rolling mean case, the engine never need to
> > keep more than 7 day worth of stock price data, even it is computing this
> > for a stream of 20 year data. (3) Live data application: data arrives in
> > real time
> >
> > I have talked to Phillip Cloud and am aware of an effort going on to
> build
> > a computation engine for SQL-like queries (mostly query on the entire
> > dataset) but am unfamiliar with the details. So I am wondering if there
> is
> > a way to design an engine that can satisfy both streaming and batch mode
> of
> > processing. Or maybe it needs to be seperate engines but we can minimize
> > the amount of duplication?
> >
> > Looking forward to any thoughts around this.
> >
> > Li
>

Re: Arrow streaming computation engine

Posted by Weston Pace <we...@gmail.com>.
First, note that there are different computation engines in different
languages.  The Rust implementation has datafusion[1] for example.
For the rest of this email, I will speak in more detail specifically
about the C++ computation engine (which I am more familiar with) that
is in place today.  The C++ engine is documented here[2] although that
documentation is a little scarce and we are working on an updated
version[3].

Also note that the docs describe a "Streaming execution engine"
because it operates on the data in a batch-oriented fashion.  However,
this doesn't guarantee that it will use a small amount of memory.  For
example, if you were to request that the engine sort the data then the
engine may need to cache the entire dataset into memory (in the future
this may mean spilling into temporary tables as memory runs out) in
order to fulfill that query (because the very last row you read might
be the very first row you need to emit).  However, for properly
constructed queries, the engine should be able to operate as you are
describing.  The queries you are describing sound to me like what one
might expect to find in a "time series database" which is another term
I've heard thrown around.

I am not an expert in time series databases so I don't know the extent
of the computation required.  However, the example you give (7 day
rolling mean of daily US stock prices) is not something that could be
efficiently computed today.  It is something that could be efficiently
computed once "window functions" are supported.  Window functions[4]
are a query engine feature that enables the sliding window needed for
a rolling average.  I believe there are people at Voltron Data that
are hoping to add support for these window functions to the C++
streaming execution engine but that is future work that is not
currently in progress.  That being said, a time series execution
engine would probably also need to know about indices, statistics,
whether the data on disk is sorted or not (and by what columns),
downsampling functions, interpolation functions, etc.  In addition,
beyond execution / computation there are concerns such as retention
policies, streaming / appending data to disk, etc.

> So I am wondering if there is
> a way to design an engine that can satisfy both streaming and batch mode of
> processing. Or maybe it needs to be seperate engines but we can minimize
> the amount of duplication?

Regardless of the timeline and plans for window functions the answer
to this specific question is probably "yes" but I'm not enough of an
expert in time series processing to answer with certainty.  The
streaming execution engine in Arrow today is quite generic.  A graph
of "exec nodes" is constructed.  Data is passed through these exec
nodes starting from one or more sources and then ending at a sink.
The sources could be live data to satisfy your request for (3).  The
plan is currently run very similar to an actor model where batches are
pushed from one node to another.  I'm hoping to add more support for
scheduling and backpressure at some point.  Given what I know of the
types of queries you are describing I think this model should suffice
to run those queries efficiently.

So, summarizing, I think some of the work we are doing will be useful
to you (though possibly not sufficient) and it would be a good idea to
reuse & share where possible.

[1] https://docs.rs/datafusion/latest/datafusion/
[2] https://arrow.apache.org/docs/cpp/streaming_execution.html
[3] https://github.com/apache/arrow/pull/12033
[4] https://medium.com/an-idea/rolling-sum-and-average-window-functions-mysql-7509d1d576e6

On Tue, Jan 11, 2022 at 11:19 AM Li Jin <ic...@gmail.com> wrote:
>
> Hi,
>
> This is a somewhat lengthy email about thoughts around a streaming
> computation engine for Arrow dataset that I would like to hear feedback
> from Arrow devs.
>
> The main use cases that we are thinking for the streaming engine are time
> series data, i.e., data arrives in time order (e.g. daily US stock prices)
> and the query often follows the time order of the data (e.g., compute 7 day
> rolling mean of daily US stock prices).
>
> The main motivations for a streaming engine is (1) performance: always
> keeps small amount of hot data always in memory and cache (2)
> memory efficiency: the engine only need to keep small amounts of data in
> memory, e.g., for the 7 day rolling mean case, the engine never need to
> keep more than 7 day worth of stock price data, even it is computing this
> for a stream of 20 year data. (3) Live data application: data arrives in
> real time
>
> I have talked to Phillip Cloud and am aware of an effort going on to build
> a computation engine for SQL-like queries (mostly query on the entire
> dataset) but am unfamiliar with the details. So I am wondering if there is
> a way to design an engine that can satisfy both streaming and batch mode of
> processing. Or maybe it needs to be seperate engines but we can minimize
> the amount of duplication?
>
> Looking forward to any thoughts around this.
>
> Li