You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Metzger <rm...@apache.org> on 2020/11/04 07:34:04 UTC

Re: Flink kafka - Message Prioritization

Hi Vignesh,

I'm adding Aljoscha to the thread, he might have an idea how to solve this
with the existing Flink APIs (the closest idea I had was the N-ary stream
operator, but I guess that doesn't support backpressuring individual
upstream operators -- side inputs would be needed for that?)

The only somewhat feasible idea I came up with, which only makes sense if
you don't need any exactly once guarantees, is implementing your own Kafka
connector (or forking the existing Kafka connector in Flink (then you could
also get exactly once)).
In this custom Kafka connector, you could, conceptually have two Kafka
consumers each feeding messages into their bounded queue. A third thread is
always emptying the messages from the queue with priority.

Best,
Robert


On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh <vi...@gmail.com>
wrote:

> Hi,
>
> I have a flink pipeline which reads from a kafka topic does a map
> operation(builds an ElasticSearch model) and sinks it to Elasticsearch
>
> *Pipeline-1:*
>
> Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism
> 8) -> Flink-Es-connector-Sink(es1) (parallelism 8)
>
> Now i want some messages to be prioritized(processed quickly not
> necessarily in any order). I am okay in creating a new topic and placing
> the priority messages in it (or) do a partition based buckets(Ex:
> https://github.com/riferrei/bucket-priority-pattern i don't think it's
> possible in flink kafka connector since partition assignment is present
> inside FlinkKafkaConsumerBase ).
>
> *I tried the below solution:*
>
> I created another topic (topic2 in which i placed the priority messages)
> and with it a new Flink pipeline
>
> *Pipeline-2:*
>
> Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism
> 8) -> Flink-Es-connector-Sink(es1) (parallelism 8)
>
> But the problem is, I want to consume topic2 as soon as possible. I can
> have a delay/slowness in topic1 because of that. If there is no message in
> topic2 then topic1 should be given more priority. But in the above case
> both the pipelines are getting processed equally. Increasing the
> parallelism of pipeline-2 to a big number doesn't help as when there is no
> message in topic2 then topic1 is still very slow(parallelism of topic 2 is
> wasted).
>
> How can i achieve this using Flink Kafka connector? Is it possible to
> achieve it in any other way?
>
>
> Regards,
>
> Vignesh
>

Re: Flink kafka - Message Prioritization

Posted by Aljoscha Krettek <al...@apache.org>.
I'm afraid there's nothing in Flink that would make this possible right now.

Have you thought about if this would be possible by using the vanilla 
Kafka Consumer APIs? I'm not sure that it's possible to read messages 
with prioritization using their APIs.

Best,
Aljoscha

On 04.11.20 08:34, Robert Metzger wrote:
> Hi Vignesh,
> 
> I'm adding Aljoscha to the thread, he might have an idea how to solve this
> with the existing Flink APIs (the closest idea I had was the N-ary stream
> operator, but I guess that doesn't support backpressuring individual
> upstream operators -- side inputs would be needed for that?)
> 
> The only somewhat feasible idea I came up with, which only makes sense if
> you don't need any exactly once guarantees, is implementing your own Kafka
> connector (or forking the existing Kafka connector in Flink (then you could
> also get exactly once)).
> In this custom Kafka connector, you could, conceptually have two Kafka
> consumers each feeding messages into their bounded queue. A third thread is
> always emptying the messages from the queue with priority.
> 
> Best,
> Robert
> 
> 
> On Thu, Oct 29, 2020 at 1:29 PM Vignesh Ramesh <vi...@gmail.com>
> wrote:
> 
>> Hi,
>>
>> I have a flink pipeline which reads from a kafka topic does a map
>> operation(builds an ElasticSearch model) and sinks it to Elasticsearch
>>
>> *Pipeline-1:*
>>
>> Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism
>> 8) -> Flink-Es-connector-Sink(es1) (parallelism 8)
>>
>> Now i want some messages to be prioritized(processed quickly not
>> necessarily in any order). I am okay in creating a new topic and placing
>> the priority messages in it (or) do a partition based buckets(Ex:
>> https://github.com/riferrei/bucket-priority-pattern i don't think it's
>> possible in flink kafka connector since partition assignment is present
>> inside FlinkKafkaConsumerBase ).
>>
>> *I tried the below solution:*
>>
>> I created another topic (topic2 in which i placed the priority messages)
>> and with it a new Flink pipeline
>>
>> *Pipeline-2:*
>>
>> Flink-Kafka-Connector-Consumer(topic2) (parallelism 8) -> Map (parallelism
>> 8) -> Flink-Es-connector-Sink(es1) (parallelism 8)
>>
>> But the problem is, I want to consume topic2 as soon as possible. I can
>> have a delay/slowness in topic1 because of that. If there is no message in
>> topic2 then topic1 should be given more priority. But in the above case
>> both the pipelines are getting processed equally. Increasing the
>> parallelism of pipeline-2 to a big number doesn't help as when there is no
>> message in topic2 then topic1 is still very slow(parallelism of topic 2 is
>> wasted).
>>
>> How can i achieve this using Flink Kafka connector? Is it possible to
>> achieve it in any other way?
>>
>>
>> Regards,
>>
>> Vignesh
>>
>