You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Kaymak, Tobias" <to...@ricardo.ch> on 2020/06/16 09:23:37 UTC

RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Hello,

I've attached my poor drawing to this email, that illustrates my idea:

I am dealing with two unlimited very large streams of data that should be
joined (articles and assets).

My first step is to have a pipeline #1 that ingests all assets and streams
them into a fast key-lookup store.
My second step would be to have a pipeline #2 that uses a CoGroupByKey to
join those two streams, then doing a batched RPC against that fast
key-lookup store to fetch any historical data.

This way I am making sure that I have the latest data always available
(even when pipeline #1 is broken or late for a couple of minutes), and by
doing the batched RPC to have always the complete history at hand (and I
don't need to keep a huge state in Beam on the workers).

Does this pattern make sense?
[image: image.png]
Cheers,
Tobi

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by Alexey Romanenko <ar...@gmail.com>.
Yes, if you don’t need to have a strong delivery semantics, then you just can instantiate a writer in DoFn @setup and use it for every bundle in your write DoFn. If the writer supports a pool of connections, then @setup will a good place for that too - then get a new connection for every bundle in @startBundle.

Take a look on CassadraIO.Write [1] or JdbcIO.Write [2] implementations. It’s based on DoFn and could be helpful as an example.

[1] https://github.com/apache/beam/blob/61b665640d6c0f91751bba59782c0ac6aceacba6/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1137
[2] https://github.com/apache/beam/blob/61b665640d6c0f91751bba59782c0ac6aceacba6/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1332

> On 18 Jun 2020, at 10:49, Kaymak, Tobias <to...@ricardo.ch> wrote:
> 
> So I came up with this - everything is handled within the same pipeline and after the CoGroupByKey for the articles (articleId) and assets (articleId) there is a ParDo doing RPC (not batched) towards Cassandra to first write out new asset information for an article, then fetch any historical per articleId from Cassandra, and then write the merged result out via BigQueryIO.
> 
> My open question is - if I want to do RPC calls in the ParDo, I should handle my own Cassandra connection in it, one per ParDo, right?
> 
> [Batching the RPCs is right now not a concern, as I expect Cassandra to scale for these reads and writes quite well.] 
> 
> <image.png>
> 
> On Wed, Jun 17, 2020 at 6:28 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> Your right, 2 TiB is a lot for some runners. Not all runners need to use memory, some may decide to use a backing disk to store state that isn't being used so it comes down to only state which is part of the working set.
> 
> Your pipeline using a GBK followed by an enrichment ParDo makes sense.
> 
> On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <tobias.kaymak@ricardo.ch <ma...@ricardo.ch>> wrote:
> What about having just one pipeline and a pre-loaded Cassandra with the history? 
> 
> Like: The pipeline reads from Kafka, then groups by key (for assets and articles that would be articleId) and then inserting the new assets to Cassandra inside a ParDo that also does a lookup for history for a given key (the enrichment) at the same time? Does that make sense?
> 
> I fear that the stateful DoFn would need a lot of memory as well - so it would need to distribute the 2TiB among the workers (?)
> 
> On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> You would need to ingest all the data into the pipeline (the 2 TiB) to be able to have the pipeline manage the state. Side inputs would only work if you could find a runner that allows for streaming side inputs that are that large (none that I'm aware of). 
> 
> For a stateful DoFn, you would "flatten" all the data from cassandra with the Kafka data and feed this into a stateful DoFn. The cassandra cluster would be a bounded source so the watermark would prevent reading from Kafka till all the Cassandra data has been read into the pipeline. The stateful DoFn key would be whatever is the join criteria for the two streams (like a userId).
> 
> 
> 
> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <tobias.kaymak@ricardo.ch <ma...@ricardo.ch>> wrote:
> Thank you Luke.
> 
> I do realize that the lag is a problem, but I fear I am not fully grasping what you mean with "relying on the pipeline managing the state". 
> What is missing from the picture is that the Cassandra cluster will be pre-loaded with the complete history of data which is around 2TiB uncompressed, afterwards then it will get constantly updated through Kafka.
> 
> If the two input streams only hold data for 14 days how would a CoGBK get the whole picture - or how could I rely on a side input or user state?
> 
> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> One issue that you are going to run into is that you can't enforce that the history is up to date as the two pipelines are running with their own lag. Even within the same pipeline this is very hard to do correctly and a lot of times it is much easier to rely on the pipeline managing the state.
> 
> Other than the CoGBK, you could also use user state or a side input to do the join.
> 
> 
> 
> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <tobias.kaymak@ricardo.ch <ma...@ricardo.ch>> wrote:
> Hello,
> 
> I've attached my poor drawing to this email, that illustrates my idea:
> 
> I am dealing with two unlimited very large streams of data that should be joined (articles and assets). 
> 
> My first step is to have a pipeline #1 that ingests all assets and streams them into a fast key-lookup store. 
> My second step would be to have a pipeline #2 that uses a CoGroupByKey to join those two streams, then doing a batched RPC against that fast key-lookup store to fetch any historical data.
> 
> This way I am making sure that I have the latest data always available (even when pipeline #1 is broken or late for a couple of minutes), and by doing the batched RPC to have always the complete history at hand (and I don't need to keep a huge state in Beam on the workers).
> 
> Does this pattern make sense? 
> <image.png>
> Cheers,
> Tobi


Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by Luke Cwik <lc...@google.com>.
You're correct that you need to manage the connections. It would be wise to
use a connection pool to help reuse connections across bundles until Apache
Beam provides something that makes managing these kinds of things easier.
See this thread[1] that Ismael started.

1:
https://lists.apache.org/thread.html/rc566016e34df0667e30ad025d1550dfa69511b439b723759247560e7%40%3Cdev.beam.apache.org%3E

On Thu, Jun 18, 2020 at 1:49 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> So I came up with this - everything is handled within the same pipeline
> and after the CoGroupByKey for the articles (articleId) and assets
> (articleId) there is a ParDo doing RPC (not batched) towards Cassandra to
> first write out new asset information for an article, then fetch any
> historical per articleId from Cassandra, and then write the merged result
> out via BigQueryIO.
>
> My open question is - if I want to do RPC calls in the ParDo, I should
> handle my own Cassandra connection in it, one per ParDo, right?
>
> [Batching the RPCs is right now not a concern, as I expect Cassandra to
> scale for these reads and writes quite well.]
>
> [image: image.png]
>
> On Wed, Jun 17, 2020 at 6:28 PM Luke Cwik <lc...@google.com> wrote:
>
>> Your right, 2 TiB is a lot for some runners. Not all runners need to use
>> memory, some may decide to use a backing disk to store state that isn't
>> being used so it comes down to only state which is part of the working set.
>>
>> Your pipeline using a GBK followed by an enrichment ParDo makes sense.
>>
>> On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> What about having just one pipeline and a pre-loaded Cassandra with the
>>> history?
>>>
>>> Like: The pipeline reads from Kafka, then groups by key (for assets and
>>> articles that would be articleId) and then inserting the new assets to
>>> Cassandra inside a ParDo that also does a lookup for history for a given
>>> key (the enrichment) at the same time? Does that make sense?
>>>
>>> I fear that the stateful DoFn would need a lot of memory as well - so it
>>> would need to distribute the 2TiB among the workers (?)
>>>
>>> On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> You would need to ingest all the data into the pipeline (the 2 TiB) to
>>>> be able to have the pipeline manage the state. Side inputs would only work
>>>> if you could find a runner that allows for streaming side inputs that are
>>>> that large (none that I'm aware of).
>>>>
>>>> For a stateful DoFn, you would "flatten" all the data from cassandra
>>>> with the Kafka data and feed this into a stateful DoFn. The cassandra
>>>> cluster would be a bounded source so the watermark would prevent reading
>>>> from Kafka till all the Cassandra data has been read into the pipeline. The
>>>> stateful DoFn key would be whatever is the join criteria for the two
>>>> streams (like a userId).
>>>>
>>>>
>>>>
>>>> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <
>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>
>>>>> Thank you Luke.
>>>>>
>>>>> I do realize that the lag is a problem, but I fear I am not fully
>>>>> grasping what you mean with "relying on the pipeline managing the state".
>>>>> What is missing from the picture is that the Cassandra cluster will be
>>>>> pre-loaded with the complete history of data which is around 2TiB
>>>>> uncompressed, afterwards then it will get constantly updated through Kafka.
>>>>>
>>>>> If the two input streams only hold data for 14 days how would a CoGBK
>>>>> get the whole picture - or how could I rely on a side input or user state?
>>>>>
>>>>> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> One issue that you are going to run into is that you can't enforce
>>>>>> that the history is up to date as the two pipelines are running with their
>>>>>> own lag. Even within the same pipeline this is very hard to do correctly
>>>>>> and a lot of times it is much easier to rely on the pipeline managing the
>>>>>> state.
>>>>>>
>>>>>> Other than the CoGBK, you could also use user state or a side input
>>>>>> to do the join.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <
>>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I've attached my poor drawing to this email, that illustrates my
>>>>>>> idea:
>>>>>>>
>>>>>>> I am dealing with two unlimited very large streams of data that
>>>>>>> should be joined (articles and assets).
>>>>>>>
>>>>>>> My first step is to have a pipeline #1 that ingests all assets and
>>>>>>> streams them into a fast key-lookup store.
>>>>>>> My second step would be to have a pipeline #2 that uses a
>>>>>>> CoGroupByKey to join those two streams, then doing a batched RPC against
>>>>>>> that fast key-lookup store to fetch any historical data.
>>>>>>>
>>>>>>> This way I am making sure that I have the latest data always
>>>>>>> available (even when pipeline #1 is broken or late for a couple of
>>>>>>> minutes), and by doing the batched RPC to have always the complete history
>>>>>>> at hand (and I don't need to keep a huge state in Beam on the workers).
>>>>>>>
>>>>>>> Does this pattern make sense?
>>>>>>> [image: image.png]
>>>>>>> Cheers,
>>>>>>> Tobi
>>>>>>>
>>>>>>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
So I came up with this - everything is handled within the same pipeline and
after the CoGroupByKey for the articles (articleId) and assets (articleId)
there is a ParDo doing RPC (not batched) towards Cassandra to first write
out new asset information for an article, then fetch any historical per
articleId from Cassandra, and then write the merged result out via
BigQueryIO.

My open question is - if I want to do RPC calls in the ParDo, I should
handle my own Cassandra connection in it, one per ParDo, right?

[Batching the RPCs is right now not a concern, as I expect Cassandra to
scale for these reads and writes quite well.]

[image: image.png]

On Wed, Jun 17, 2020 at 6:28 PM Luke Cwik <lc...@google.com> wrote:

> Your right, 2 TiB is a lot for some runners. Not all runners need to use
> memory, some may decide to use a backing disk to store state that isn't
> being used so it comes down to only state which is part of the working set.
>
> Your pipeline using a GBK followed by an enrichment ParDo makes sense.
>
> On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> What about having just one pipeline and a pre-loaded Cassandra with the
>> history?
>>
>> Like: The pipeline reads from Kafka, then groups by key (for assets and
>> articles that would be articleId) and then inserting the new assets to
>> Cassandra inside a ParDo that also does a lookup for history for a given
>> key (the enrichment) at the same time? Does that make sense?
>>
>> I fear that the stateful DoFn would need a lot of memory as well - so it
>> would need to distribute the 2TiB among the workers (?)
>>
>> On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> You would need to ingest all the data into the pipeline (the 2 TiB) to
>>> be able to have the pipeline manage the state. Side inputs would only work
>>> if you could find a runner that allows for streaming side inputs that are
>>> that large (none that I'm aware of).
>>>
>>> For a stateful DoFn, you would "flatten" all the data from cassandra
>>> with the Kafka data and feed this into a stateful DoFn. The cassandra
>>> cluster would be a bounded source so the watermark would prevent reading
>>> from Kafka till all the Cassandra data has been read into the pipeline. The
>>> stateful DoFn key would be whatever is the join criteria for the two
>>> streams (like a userId).
>>>
>>>
>>>
>>> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <to...@ricardo.ch>
>>> wrote:
>>>
>>>> Thank you Luke.
>>>>
>>>> I do realize that the lag is a problem, but I fear I am not fully
>>>> grasping what you mean with "relying on the pipeline managing the state".
>>>> What is missing from the picture is that the Cassandra cluster will be
>>>> pre-loaded with the complete history of data which is around 2TiB
>>>> uncompressed, afterwards then it will get constantly updated through Kafka.
>>>>
>>>> If the two input streams only hold data for 14 days how would a CoGBK
>>>> get the whole picture - or how could I rely on a side input or user state?
>>>>
>>>> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> One issue that you are going to run into is that you can't enforce
>>>>> that the history is up to date as the two pipelines are running with their
>>>>> own lag. Even within the same pipeline this is very hard to do correctly
>>>>> and a lot of times it is much easier to rely on the pipeline managing the
>>>>> state.
>>>>>
>>>>> Other than the CoGBK, you could also use user state or a side input to
>>>>> do the join.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <
>>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I've attached my poor drawing to this email, that illustrates my idea:
>>>>>>
>>>>>> I am dealing with two unlimited very large streams of data that
>>>>>> should be joined (articles and assets).
>>>>>>
>>>>>> My first step is to have a pipeline #1 that ingests all assets and
>>>>>> streams them into a fast key-lookup store.
>>>>>> My second step would be to have a pipeline #2 that uses a
>>>>>> CoGroupByKey to join those two streams, then doing a batched RPC against
>>>>>> that fast key-lookup store to fetch any historical data.
>>>>>>
>>>>>> This way I am making sure that I have the latest data always
>>>>>> available (even when pipeline #1 is broken or late for a couple of
>>>>>> minutes), and by doing the batched RPC to have always the complete history
>>>>>> at hand (and I don't need to keep a huge state in Beam on the workers).
>>>>>>
>>>>>> Does this pattern make sense?
>>>>>> [image: image.png]
>>>>>> Cheers,
>>>>>> Tobi
>>>>>>
>>>>>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by Luke Cwik <lc...@google.com>.
Your right, 2 TiB is a lot for some runners. Not all runners need to use
memory, some may decide to use a backing disk to store state that isn't
being used so it comes down to only state which is part of the working set.

Your pipeline using a GBK followed by an enrichment ParDo makes sense.

On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> What about having just one pipeline and a pre-loaded Cassandra with the
> history?
>
> Like: The pipeline reads from Kafka, then groups by key (for assets and
> articles that would be articleId) and then inserting the new assets to
> Cassandra inside a ParDo that also does a lookup for history for a given
> key (the enrichment) at the same time? Does that make sense?
>
> I fear that the stateful DoFn would need a lot of memory as well - so it
> would need to distribute the 2TiB among the workers (?)
>
> On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <lc...@google.com> wrote:
>
>> You would need to ingest all the data into the pipeline (the 2 TiB) to be
>> able to have the pipeline manage the state. Side inputs would only work if
>> you could find a runner that allows for streaming side inputs that are that
>> large (none that I'm aware of).
>>
>> For a stateful DoFn, you would "flatten" all the data from cassandra with
>> the Kafka data and feed this into a stateful DoFn. The cassandra cluster
>> would be a bounded source so the watermark would prevent reading from Kafka
>> till all the Cassandra data has been read into the pipeline. The stateful
>> DoFn key would be whatever is the join criteria for the two streams (like a
>> userId).
>>
>>
>>
>> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> Thank you Luke.
>>>
>>> I do realize that the lag is a problem, but I fear I am not fully
>>> grasping what you mean with "relying on the pipeline managing the state".
>>> What is missing from the picture is that the Cassandra cluster will be
>>> pre-loaded with the complete history of data which is around 2TiB
>>> uncompressed, afterwards then it will get constantly updated through Kafka.
>>>
>>> If the two input streams only hold data for 14 days how would a CoGBK
>>> get the whole picture - or how could I rely on a side input or user state?
>>>
>>> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> One issue that you are going to run into is that you can't enforce that
>>>> the history is up to date as the two pipelines are running with their own
>>>> lag. Even within the same pipeline this is very hard to do correctly and a
>>>> lot of times it is much easier to rely on the pipeline managing the state.
>>>>
>>>> Other than the CoGBK, you could also use user state or a side input to
>>>> do the join.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <
>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I've attached my poor drawing to this email, that illustrates my idea:
>>>>>
>>>>> I am dealing with two unlimited very large streams of data that should
>>>>> be joined (articles and assets).
>>>>>
>>>>> My first step is to have a pipeline #1 that ingests all assets and
>>>>> streams them into a fast key-lookup store.
>>>>> My second step would be to have a pipeline #2 that uses a CoGroupByKey
>>>>> to join those two streams, then doing a batched RPC against that fast
>>>>> key-lookup store to fetch any historical data.
>>>>>
>>>>> This way I am making sure that I have the latest data always available
>>>>> (even when pipeline #1 is broken or late for a couple of minutes), and by
>>>>> doing the batched RPC to have always the complete history at hand (and I
>>>>> don't need to keep a huge state in Beam on the workers).
>>>>>
>>>>> Does this pattern make sense?
>>>>> [image: image.png]
>>>>> Cheers,
>>>>> Tobi
>>>>>
>>>>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by Kenneth Knowles <ke...@apache.org>.
+Mikhail Gryzykhin <mi...@google.com> and +Tyson Hamilton
<ty...@google.com> with whom I have discussed stream-stream joins similar
to this (not sure how closely they are following user@)

Kenn

On Wed, Jun 17, 2020 at 9:20 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> What about having just one pipeline and a pre-loaded Cassandra with the
> history?
>
> Like: The pipeline reads from Kafka, then groups by key (for assets and
> articles that would be articleId) and then inserting the new assets to
> Cassandra inside a ParDo that also does a lookup for history for a given
> key (the enrichment) at the same time? Does that make sense?
>
> I fear that the stateful DoFn would need a lot of memory as well - so it
> would need to distribute the 2TiB among the workers (?)
>
> On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <lc...@google.com> wrote:
>
>> You would need to ingest all the data into the pipeline (the 2 TiB) to be
>> able to have the pipeline manage the state. Side inputs would only work if
>> you could find a runner that allows for streaming side inputs that are that
>> large (none that I'm aware of).
>>
>> For a stateful DoFn, you would "flatten" all the data from cassandra with
>> the Kafka data and feed this into a stateful DoFn. The cassandra cluster
>> would be a bounded source so the watermark would prevent reading from Kafka
>> till all the Cassandra data has been read into the pipeline. The stateful
>> DoFn key would be whatever is the join criteria for the two streams (like a
>> userId).
>>
>>
>>
>> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> Thank you Luke.
>>>
>>> I do realize that the lag is a problem, but I fear I am not fully
>>> grasping what you mean with "relying on the pipeline managing the state".
>>> What is missing from the picture is that the Cassandra cluster will be
>>> pre-loaded with the complete history of data which is around 2TiB
>>> uncompressed, afterwards then it will get constantly updated through Kafka.
>>>
>>> If the two input streams only hold data for 14 days how would a CoGBK
>>> get the whole picture - or how could I rely on a side input or user state?
>>>
>>> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> One issue that you are going to run into is that you can't enforce that
>>>> the history is up to date as the two pipelines are running with their own
>>>> lag. Even within the same pipeline this is very hard to do correctly and a
>>>> lot of times it is much easier to rely on the pipeline managing the state.
>>>>
>>>> Other than the CoGBK, you could also use user state or a side input to
>>>> do the join.
>>>>
>>>>
>>>>
>>>> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <
>>>> tobias.kaymak@ricardo.ch> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I've attached my poor drawing to this email, that illustrates my idea:
>>>>>
>>>>> I am dealing with two unlimited very large streams of data that should
>>>>> be joined (articles and assets).
>>>>>
>>>>> My first step is to have a pipeline #1 that ingests all assets and
>>>>> streams them into a fast key-lookup store.
>>>>> My second step would be to have a pipeline #2 that uses a CoGroupByKey
>>>>> to join those two streams, then doing a batched RPC against that fast
>>>>> key-lookup store to fetch any historical data.
>>>>>
>>>>> This way I am making sure that I have the latest data always available
>>>>> (even when pipeline #1 is broken or late for a couple of minutes), and by
>>>>> doing the batched RPC to have always the complete history at hand (and I
>>>>> don't need to keep a huge state in Beam on the workers).
>>>>>
>>>>> Does this pattern make sense?
>>>>> [image: image.png]
>>>>> Cheers,
>>>>> Tobi
>>>>>
>>>>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
What about having just one pipeline and a pre-loaded Cassandra with the
history?

Like: The pipeline reads from Kafka, then groups by key (for assets and
articles that would be articleId) and then inserting the new assets to
Cassandra inside a ParDo that also does a lookup for history for a given
key (the enrichment) at the same time? Does that make sense?

I fear that the stateful DoFn would need a lot of memory as well - so it
would need to distribute the 2TiB among the workers (?)

On Wed, Jun 17, 2020 at 6:01 PM Luke Cwik <lc...@google.com> wrote:

> You would need to ingest all the data into the pipeline (the 2 TiB) to be
> able to have the pipeline manage the state. Side inputs would only work if
> you could find a runner that allows for streaming side inputs that are that
> large (none that I'm aware of).
>
> For a stateful DoFn, you would "flatten" all the data from cassandra with
> the Kafka data and feed this into a stateful DoFn. The cassandra cluster
> would be a bounded source so the watermark would prevent reading from Kafka
> till all the Cassandra data has been read into the pipeline. The stateful
> DoFn key would be whatever is the join criteria for the two streams (like a
> userId).
>
>
>
> On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> Thank you Luke.
>>
>> I do realize that the lag is a problem, but I fear I am not fully
>> grasping what you mean with "relying on the pipeline managing the state".
>> What is missing from the picture is that the Cassandra cluster will be
>> pre-loaded with the complete history of data which is around 2TiB
>> uncompressed, afterwards then it will get constantly updated through Kafka.
>>
>> If the two input streams only hold data for 14 days how would a CoGBK get
>> the whole picture - or how could I rely on a side input or user state?
>>
>> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> One issue that you are going to run into is that you can't enforce that
>>> the history is up to date as the two pipelines are running with their own
>>> lag. Even within the same pipeline this is very hard to do correctly and a
>>> lot of times it is much easier to rely on the pipeline managing the state.
>>>
>>> Other than the CoGBK, you could also use user state or a side input to
>>> do the join.
>>>
>>>
>>>
>>> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <to...@ricardo.ch>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I've attached my poor drawing to this email, that illustrates my idea:
>>>>
>>>> I am dealing with two unlimited very large streams of data that should
>>>> be joined (articles and assets).
>>>>
>>>> My first step is to have a pipeline #1 that ingests all assets and
>>>> streams them into a fast key-lookup store.
>>>> My second step would be to have a pipeline #2 that uses a CoGroupByKey
>>>> to join those two streams, then doing a batched RPC against that fast
>>>> key-lookup store to fetch any historical data.
>>>>
>>>> This way I am making sure that I have the latest data always available
>>>> (even when pipeline #1 is broken or late for a couple of minutes), and by
>>>> doing the batched RPC to have always the complete history at hand (and I
>>>> don't need to keep a huge state in Beam on the workers).
>>>>
>>>> Does this pattern make sense?
>>>> [image: image.png]
>>>> Cheers,
>>>> Tobi
>>>>
>>>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by Luke Cwik <lc...@google.com>.
You would need to ingest all the data into the pipeline (the 2 TiB) to be
able to have the pipeline manage the state. Side inputs would only work if
you could find a runner that allows for streaming side inputs that are that
large (none that I'm aware of).

For a stateful DoFn, you would "flatten" all the data from cassandra with
the Kafka data and feed this into a stateful DoFn. The cassandra cluster
would be a bounded source so the watermark would prevent reading from Kafka
till all the Cassandra data has been read into the pipeline. The stateful
DoFn key would be whatever is the join criteria for the two streams (like a
userId).



On Wed, Jun 17, 2020 at 7:46 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Thank you Luke.
>
> I do realize that the lag is a problem, but I fear I am not fully grasping
> what you mean with "relying on the pipeline managing the state".
> What is missing from the picture is that the Cassandra cluster will be
> pre-loaded with the complete history of data which is around 2TiB
> uncompressed, afterwards then it will get constantly updated through Kafka.
>
> If the two input streams only hold data for 14 days how would a CoGBK get
> the whole picture - or how could I rely on a side input or user state?
>
> On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:
>
>> One issue that you are going to run into is that you can't enforce that
>> the history is up to date as the two pipelines are running with their own
>> lag. Even within the same pipeline this is very hard to do correctly and a
>> lot of times it is much easier to rely on the pipeline managing the state.
>>
>> Other than the CoGBK, you could also use user state or a side input to do
>> the join.
>>
>>
>>
>> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <to...@ricardo.ch>
>> wrote:
>>
>>> Hello,
>>>
>>> I've attached my poor drawing to this email, that illustrates my idea:
>>>
>>> I am dealing with two unlimited very large streams of data that should
>>> be joined (articles and assets).
>>>
>>> My first step is to have a pipeline #1 that ingests all assets and
>>> streams them into a fast key-lookup store.
>>> My second step would be to have a pipeline #2 that uses a CoGroupByKey
>>> to join those two streams, then doing a batched RPC against that fast
>>> key-lookup store to fetch any historical data.
>>>
>>> This way I am making sure that I have the latest data always available
>>> (even when pipeline #1 is broken or late for a couple of minutes), and by
>>> doing the batched RPC to have always the complete history at hand (and I
>>> don't need to keep a huge state in Beam on the workers).
>>>
>>> Does this pattern make sense?
>>> [image: image.png]
>>> Cheers,
>>> Tobi
>>>
>>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by "Kaymak, Tobias" <to...@ricardo.ch>.
Thank you Luke.

I do realize that the lag is a problem, but I fear I am not fully grasping
what you mean with "relying on the pipeline managing the state".
What is missing from the picture is that the Cassandra cluster will be
pre-loaded with the complete history of data which is around 2TiB
uncompressed, afterwards then it will get constantly updated through Kafka.

If the two input streams only hold data for 14 days how would a CoGBK get
the whole picture - or how could I rely on a side input or user state?

On Tue, Jun 16, 2020 at 10:06 PM Luke Cwik <lc...@google.com> wrote:

> One issue that you are going to run into is that you can't enforce that
> the history is up to date as the two pipelines are running with their own
> lag. Even within the same pipeline this is very hard to do correctly and a
> lot of times it is much easier to rely on the pipeline managing the state.
>
> Other than the CoGBK, you could also use user state or a side input to do
> the join.
>
>
>
> On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <to...@ricardo.ch>
> wrote:
>
>> Hello,
>>
>> I've attached my poor drawing to this email, that illustrates my idea:
>>
>> I am dealing with two unlimited very large streams of data that should be
>> joined (articles and assets).
>>
>> My first step is to have a pipeline #1 that ingests all assets and
>> streams them into a fast key-lookup store.
>> My second step would be to have a pipeline #2 that uses a CoGroupByKey to
>> join those two streams, then doing a batched RPC against that fast
>> key-lookup store to fetch any historical data.
>>
>> This way I am making sure that I have the latest data always available
>> (even when pipeline #1 is broken or late for a couple of minutes), and by
>> doing the batched RPC to have always the complete history at hand (and I
>> don't need to keep a huge state in Beam on the workers).
>>
>> Does this pattern make sense?
>> [image: image.png]
>> Cheers,
>> Tobi
>>
>

Re: RFC - Idea: joining two streams with limited history (days), then doing batched RPC for full history

Posted by Luke Cwik <lc...@google.com>.
One issue that you are going to run into is that you can't enforce that the
history is up to date as the two pipelines are running with their own lag.
Even within the same pipeline this is very hard to do correctly and a lot
of times it is much easier to rely on the pipeline managing the state.

Other than the CoGBK, you could also use user state or a side input to do
the join.



On Tue, Jun 16, 2020 at 2:24 AM Kaymak, Tobias <to...@ricardo.ch>
wrote:

> Hello,
>
> I've attached my poor drawing to this email, that illustrates my idea:
>
> I am dealing with two unlimited very large streams of data that should be
> joined (articles and assets).
>
> My first step is to have a pipeline #1 that ingests all assets and streams
> them into a fast key-lookup store.
> My second step would be to have a pipeline #2 that uses a CoGroupByKey to
> join those two streams, then doing a batched RPC against that fast
> key-lookup store to fetch any historical data.
>
> This way I am making sure that I have the latest data always available
> (even when pipeline #1 is broken or late for a couple of minutes), and by
> doing the batched RPC to have always the complete history at hand (and I
> don't need to keep a huge state in Beam on the workers).
>
> Does this pattern make sense?
> [image: image.png]
> Cheers,
> Tobi
>