You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jakes John <ja...@gmail.com> on 2017/08/18 03:05:52 UTC

Efficient grouping and parallelism on skewed data

Can some one help me in figuring out how to implement in flink.

I have to create a pipeline Kafka->flink->elasticsearch. I have high
throughput data coming into Kafka. All messages in Kafka have a key called
'id' and value is a integer that ranges 1 to N. N is dynamic with max value
as 100.  The number of messages across different ID's are drastically
different. For eg. Number of incoming messages with id 10 can be 500 times
the number of incoming messages with id 11.
One requirement is that messages with a particular id has to be written to
a corresponding elasticsearch index. Eg. Messages with id 1 is written to
elasticsearch index 1, Messages with id 2 is written to elasticsearch index
2 and so on. ... In other words, there will be 100 elasticsearch indices at
most.

I have the control over Kafka. I can make sure that messages are written to
a single topic or messages are separately written to different topics based
on their ids.  The only requirement is that messages are written to indices
that correspond to the ids.

1. What are the possible ways that I can achieve this in Flink?
2. If I use a single kafka topic and a single flink job,  what is the best
way to group ids in this case and set parallelism according to the
distribution of data.? The parallelism required to write into ES is going
to be different for different ids(as i said earlier, distribution of data
across ids are drastically different).
3. If i have a Kafka topic per id and a topology per id looks ugly and too
resource intensive. There are some ids that have very very few data. What
is the best way to do this if we were to choose this option ?

Re: Efficient grouping and parallelism on skewed data

Posted by Jakes John <ja...@gmail.com>.
Thanks for your reply.

I don't have any special aggregation.   My only requirement is, for every
message in kafka with a particular id,  write into a corresponding index in
Elasticsearch.( I might need to enrich each message before writing into ES,
but there are no aggregations on incoming stream)

1.Can you please explain how to do this? ("split the stream, splitting out
ids that you know to have higher throughput, and pipeline that split stream
to an Elasticsearch Sink with higher parallelism")  Sorry that I couldn't
follow this. Do you have some example to look at this?

2. If I don't know the throughput of each stream before hand to configure
parallelism, Is there a possible solution to do this dynamically ? eg: if
throughput of a stream is getting higher at an instant,  configure more
parallelism for ElasticsearchSink, and lesser parallelism for lesser
throughput streams.



On Thu, Aug 17, 2017 at 9:38 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi John,
>
> Do you need to do any sort of grouping on the keys and aggregation? Or are
> you simply using Flink to route the Kafka messages to different
> Elasticsearch indices?
>
> For the following I’m assuming the latter:
> If there’s no need for aggregate computation per key, what you can do is
> simply do is pipeline the input stream directly to the Elasticsearch sink.
> The Flink Elasticsearch Sink API allows you to request each individual
> incoming record to a different index.
> If you want to have more Elasticsearch sink instances for a specific id,
> what you can do is split the stream, splitting out ids that you know to
> have higher throughput, and pipeline that split stream to an Elasticsearch
> Sink with higher parallelism.
>
> Gordon
>
> On 18 August 2017 at 11:06:02 AM, Jakes John (jakesjohn12345@gmail.com)
> wrote:
>
> Can some one help me in figuring out how to implement in flink.
>
> I have to create a pipeline Kafka->flink->elasticsearch. I have high
> throughput data coming into Kafka. All messages in Kafka have a key called
> 'id' and value is a integer that ranges 1 to N. N is dynamic with max value
> as 100.  The number of messages across different ID's are drastically
> different. For eg. Number of incoming messages with id 10 can be 500 times
> the number of incoming messages with id 11.
> One requirement is that messages with a particular id has to be written to
> a corresponding elasticsearch index. Eg. Messages with id 1 is written to
> elasticsearch index 1, Messages with id 2 is written to elasticsearch index
> 2 and so on. ... In other words, there will be 100 elasticsearch indices at
> most.
>
> I have the control over Kafka. I can make sure that messages are written
> to a single topic or messages are separately written to different topics
> based on their ids.  The only requirement is that messages are written to
> indices that correspond to the ids.
>
> 1. What are the possible ways that I can achieve this in Flink?
> 2. If I use a single kafka topic and a single flink job,  what is the best
> way to group ids in this case and set parallelism according to the
> distribution of data.? The parallelism required to write into ES is going
> to be different for different ids(as i said earlier, distribution of data
> across ids are drastically different).
> 3. If i have a Kafka topic per id and a topology per id looks ugly and too
> resource intensive. There are some ids that have very very few data. What
> is the best way to do this if we were to choose this option ?
>
>

Re: Efficient grouping and parallelism on skewed data

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi John,

Do you need to do any sort of grouping on the keys and aggregation? Or are you simply using Flink to route the Kafka messages to different Elasticsearch indices?

For the following I’m assuming the latter:
If there’s no need for aggregate computation per key, what you can do is simply do is pipeline the input stream directly to the Elasticsearch sink.
The Flink Elasticsearch Sink API allows you to request each individual incoming record to a different index.
If you want to have more Elasticsearch sink instances for a specific id, what you can do is split the stream, splitting out ids that you know to have higher throughput, and pipeline that split stream to an Elasticsearch Sink with higher parallelism.

Gordon

On 18 August 2017 at 11:06:02 AM, Jakes John (jakesjohn12345@gmail.com) wrote:

Can some one help me in figuring out how to implement in flink. 

I have to create a pipeline Kafka->flink->elasticsearch. I have high throughput data coming into Kafka. All messages in Kafka have a key called 'id' and value is a integer that ranges 1 to N. N is dynamic with max value as 100.  The number of messages across different ID's are drastically different. For eg. Number of incoming messages with id 10 can be 500 times the number of incoming messages with id 11.   
One requirement is that messages with a particular id has to be written to a corresponding elasticsearch index. Eg. Messages with id 1 is written to elasticsearch index 1, Messages with id 2 is written to elasticsearch index 2 and so on. ... In other words, there will be 100 elasticsearch indices at most.

I have the control over Kafka. I can make sure that messages are written to a single topic or messages are separately written to different topics based on their ids.  The only requirement is that messages are written to indices that correspond to the ids.

1. What are the possible ways that I can achieve this in Flink? 
2. If I use a single kafka topic and a single flink job,  what is the best way to group ids in this case and set parallelism according to the distribution of data.? The parallelism required to write into ES is going to be different for different ids(as i said earlier, distribution of data across ids are drastically different).
3. If i have a Kafka topic per id and a topology per id looks ugly and too resource intensive. There are some ids that have very very few data. What is the best way to do this if we were to choose this option ?