You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2018/05/08 10:22:38 UTC

Streaming and batch jobs together

Hi all,
I'd like to introduce in our pipeline an efficient way to aggregate
incoming data around an entity.

We have basically new incoming facts that are added (but also removed
potentially) to an entity (by id). For example, when we receive a new name
of a city we add this name to the known names of that city id (if the first
field of the tuple is ADD, if it is DEL we remove it).
At the moment we use batch job to generate an initial version of the
entities, another job that add facts to this initial version of the
entities, and another one that merges the base and the computed data. This
is somehow very inefficient in terms of speed and disk space (because every
step requires to materialize the data on the disk).

I was wondering whether Flink could help here or not...there are a couple
of requirements that make things very complicated:

   - states could be potentially large (a lot of data related to an
   entity). Is there any limitation about the size of the states?
   - data must be readable by a batch job. If I'm not wrong this could be
   easily solved flushing data periodically to an external sink (like HBase or
   similar)
   - how to keep the long-running stream job up and run a batch job at the
   same time? Will this be possible after Flip-6?
   - how to add ingest new data? Do I really need Kafka or can I just add
   new datasets to a staging HDFS dir (and move them to another dir once
   ingested)?

Best,
Flavio

Re: Streaming and batch jobs together

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok, thanks for the clarification Kostas. What about multiple jobs running
at the same time?

On Wed, 9 May 2018, 14:39 Kostas Kloudas, <k....@data-artisans.com>
wrote:

> Hi Flavio,
>
> Flink has no inherent limitations as far as state size is concerned, apart
> from the fact that the state associated to a *single key*
> (not the total state) should fit in memory. For production use, it is also
> advised to use the RocksDB state backend, as this will
> allow you to spill on disk when the state grows too large.
>
> Now for recommended DB/no-sql store, there is no recommendation from my
> part. It depends on what you and your team are
> more familiar with. I suppose you are talking about sink, right? In this
> case, it also depends on what will optimize your batch jobs
> that will read the updated dataset.
>
> Thanks,
> Kostas
>
> On May 8, 2018, at 10:40 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> Thanks! Both solutions are reasonable but ehat abiut max state size (per
> key)?is there any suggested database/nosql store to use?
>
> On Tue, 8 May 2018, 18:09 TechnoMage, <ml...@technomage.com> wrote:
>
>> If you use a KeyedStream you can group records by key (city) and then use
>> a RichFlatMap to aggregate state in a MapState or ListState per key.  You
>> can then have that operator publish the updated results as a new aggregated
>> record, or send it to a database or such as you see fit.
>>
>> Michael
>>
>> On May 8, 2018, at 4:22 AM, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> Hi all,
>> I'd like to introduce in our pipeline an efficient way to aggregate
>> incoming data around an entity.
>>
>> We have basically new incoming facts that are added (but also removed
>> potentially) to an entity (by id). For example, when we receive a new name
>> of a city we add this name to the known names of that city id (if the first
>> field of the tuple is ADD, if it is DEL we remove it).
>> At the moment we use batch job to generate an initial version of the
>> entities, another job that add facts to this initial version of the
>> entities, and another one that merges the base and the computed data. This
>> is somehow very inefficient in terms of speed and disk space (because every
>> step requires to materialize the data on the disk).
>>
>> I was wondering whether Flink could help here or not...there are a couple
>> of requirements that make things very complicated:
>>
>>    - states could be potentially large (a lot of data related to an
>>    entity). Is there any limitation about the size of the states?
>>    - data must be readable by a batch job. If I'm not wrong this could
>>    be easily solved flushing data periodically to an external sink (like HBase
>>    or similar)
>>    - how to keep the long-running stream job up and run a batch job at
>>    the same time? Will this be possible after Flip-6?
>>    - how to add ingest new data? Do I really need Kafka or can I just
>>    add new datasets to a staging HDFS dir (and move them to another dir once
>>    ingested)?
>>
>> Best,
>> Flavio
>>
>>
>>
>

Re: Streaming and batch jobs together

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Flavio,

Flink has no inherent limitations as far as state size is concerned, apart from the fact that the state associated to a single key
(not the total state) should fit in memory. For production use, it is also advised to use the RocksDB state backend, as this will
allow you to spill on disk when the state grows too large.

Now for recommended DB/no-sql store, there is no recommendation from my part. It depends on what you and your team are 
more familiar with. I suppose you are talking about sink, right? In this case, it also depends on what will optimize your batch jobs 
that will read the updated dataset.

Thanks,
Kostas

> On May 8, 2018, at 10:40 PM, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Thanks! Both solutions are reasonable but ehat abiut max state size (per key)?is there any suggested database/nosql store to use?
> 
> On Tue, 8 May 2018, 18:09 TechnoMage, <mlatta@technomage.com <ma...@technomage.com>> wrote:
> If you use a KeyedStream you can group records by key (city) and then use a RichFlatMap to aggregate state in a MapState or ListState per key.  You can then have that operator publish the updated results as a new aggregated record, or send it to a database or such as you see fit.
> 
> Michael
> 
>> On May 8, 2018, at 4:22 AM, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>> 
>> Hi all,
>> I'd like to introduce in our pipeline an efficient way to aggregate incoming data around an entity.
>> 
>> We have basically new incoming facts that are added (but also removed potentially) to an entity (by id). For example, when we receive a new name of a city we add this name to the known names of that city id (if the first field of the tuple is ADD, if it is DEL we remove it).
>> At the moment we use batch job to generate an initial version of the entities, another job that add facts to this initial version of the entities, and another one that merges the base and the computed data. This is somehow very inefficient in terms of speed and disk space (because every step requires to materialize the data on the disk).
>> 
>> I was wondering whether Flink could help here or not...there are a couple of requirements that make things very complicated:
>> states could be potentially large (a lot of data related to an entity). Is there any limitation about the size of the states?
>> data must be readable by a batch job. If I'm not wrong this could be easily solved flushing data periodically to an external sink (like HBase or similar)
>> how to keep the long-running stream job up and run a batch job at the same time? Will this be possible after Flip-6?
>> how to add ingest new data? Do I really need Kafka or can I just add new datasets to a staging HDFS dir (and move them to another dir once ingested)?
>> Best,
>> Flavio
> 


Re: Streaming and batch jobs together

Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks! Both solutions are reasonable but ehat abiut max state size (per
key)?is there any suggested database/nosql store to use?

On Tue, 8 May 2018, 18:09 TechnoMage, <ml...@technomage.com> wrote:

> If you use a KeyedStream you can group records by key (city) and then use
> a RichFlatMap to aggregate state in a MapState or ListState per key.  You
> can then have that operator publish the updated results as a new aggregated
> record, or send it to a database or such as you see fit.
>
> Michael
>
> On May 8, 2018, at 4:22 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> Hi all,
> I'd like to introduce in our pipeline an efficient way to aggregate
> incoming data around an entity.
>
> We have basically new incoming facts that are added (but also removed
> potentially) to an entity (by id). For example, when we receive a new name
> of a city we add this name to the known names of that city id (if the first
> field of the tuple is ADD, if it is DEL we remove it).
> At the moment we use batch job to generate an initial version of the
> entities, another job that add facts to this initial version of the
> entities, and another one that merges the base and the computed data. This
> is somehow very inefficient in terms of speed and disk space (because every
> step requires to materialize the data on the disk).
>
> I was wondering whether Flink could help here or not...there are a couple
> of requirements that make things very complicated:
>
>    - states could be potentially large (a lot of data related to an
>    entity). Is there any limitation about the size of the states?
>    - data must be readable by a batch job. If I'm not wrong this could be
>    easily solved flushing data periodically to an external sink (like HBase or
>    similar)
>    - how to keep the long-running stream job up and run a batch job at
>    the same time? Will this be possible after Flip-6?
>    - how to add ingest new data? Do I really need Kafka or can I just add
>    new datasets to a staging HDFS dir (and move them to another dir once
>    ingested)?
>
> Best,
> Flavio
>
>
>

Re: Streaming and batch jobs together

Posted by TechnoMage <ml...@technomage.com>.
If you use a KeyedStream you can group records by key (city) and then use a RichFlatMap to aggregate state in a MapState or ListState per key.  You can then have that operator publish the updated results as a new aggregated record, or send it to a database or such as you see fit.

Michael

> On May 8, 2018, at 4:22 AM, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Hi all,
> I'd like to introduce in our pipeline an efficient way to aggregate incoming data around an entity.
> 
> We have basically new incoming facts that are added (but also removed potentially) to an entity (by id). For example, when we receive a new name of a city we add this name to the known names of that city id (if the first field of the tuple is ADD, if it is DEL we remove it).
> At the moment we use batch job to generate an initial version of the entities, another job that add facts to this initial version of the entities, and another one that merges the base and the computed data. This is somehow very inefficient in terms of speed and disk space (because every step requires to materialize the data on the disk).
> 
> I was wondering whether Flink could help here or not...there are a couple of requirements that make things very complicated:
> states could be potentially large (a lot of data related to an entity). Is there any limitation about the size of the states?
> data must be readable by a batch job. If I'm not wrong this could be easily solved flushing data periodically to an external sink (like HBase or similar)
> how to keep the long-running stream job up and run a batch job at the same time? Will this be possible after Flip-6?
> how to add ingest new data? Do I really need Kafka or can I just add new datasets to a staging HDFS dir (and move them to another dir once ingested)?
> Best,
> Flavio


Re: Streaming and batch jobs together

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Flavio,

If I understand correctly, you have a set of keys which evolves in two ways:
	keys may be added/deleted
	values associated with the keys can also be updated.

If this is the case, you can use a streaming job that:
	1. has as a source the stream of events (ADD/DELETEā€¦)
	2. has after the source a flatmap that reads the initial set of keys in the open() method
		as soon as it sees the first element from the stream, it flushes out all the keys it has read in the previous step, and then the element
			(the previous step will guarantee that you will see the set of keys before the change events, if this is important. If not, then ignore that step)
	3. then there is a keyBy that partitions the elements by key (e.g. countryId)
	4. then a process function that takes care of deduplicating the initial keys and updating the states associated with each key.

Does this sound like a good starting point?

Thanks,
Kostas


> On May 8, 2018, at 12:22 PM, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Hi all,
> I'd like to introduce in our pipeline an efficient way to aggregate incoming data around an entity.
> 
> We have basically new incoming facts that are added (but also removed potentially) to an entity (by id). For example, when we receive a new name of a city we add this name to the known names of that city id (if the first field of the tuple is ADD, if it is DEL we remove it).
> At the moment we use batch job to generate an initial version of the entities, another job that add facts to this initial version of the entities, and another one that merges the base and the computed data. This is somehow very inefficient in terms of speed and disk space (because every step requires to materialize the data on the disk).
> 
> I was wondering whether Flink could help here or not...there are a couple of requirements that make things very complicated:
> states could be potentially large (a lot of data related to an entity). Is there any limitation about the size of the states?
> data must be readable by a batch job. If I'm not wrong this could be easily solved flushing data periodically to an external sink (like HBase or similar)
> how to keep the long-running stream job up and run a batch job at the same time? Will this be possible after Flip-6?
> how to add ingest new data? Do I really need Kafka or can I just add new datasets to a staging HDFS dir (and move them to another dir once ingested)?
> Best,
> Flavio