You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by na...@gmail.com, na...@gmail.com on 2019/03/23 20:57:58 UTC

Re: [VOTE] KIP-349 Priorities for Source Topics


On 2019/01/28 02:26:31, nick@afshartous.com wrote: 
> Hi Sönke,
> 
> Thanks for taking the time to review.  I’ve put KIP-349 into hibernation.  
> 
> Thanks also to everyone who participated in the discussion.
> 
> Best regards,
> --
>       Nick
> 
> > On Jan 25, 2019, at 5:51 AM, Sönke Liebau <so...@opencore.com.INVALID> wrote:
> > 
> > a bit late to the party, sorry. I recently spent some time looking
> > into this / a similar issue [1].
> > After some investigation and playing around with settings I think that
> > the benefit that could be gained from this is somewhat limited and
> > probably outweighed by the implementation effort.
> > 
> > The consumer internal are already geared towards treating partitions
> > fairly so that no partition has to wait an undue amount of time and
> > this can be further tuned for latency over throughput. Additionally,
> > if this is a large issue for someone, there is always the option of
> > having a dedicated consumer reading only from the control topic, which
> > would mean that messages from that topic are received "immediately".
> > For a Kafka Streams job it would probably make sense to create two
> > input streams and then merging those as a first step.
> > 
> > I think with these knobs a fairly large amount of flexibility can be
> > achieved so that there is no urgent need to implement priorities.
> > 
> > So my personal preference would be to set this KIP to dormant for now.
> 
> 
> 
> 
> 
> 
Hello Nick,

I'm extremely new to Kafka, but I was attempting to set up a per-topic priority application, and ended up finding this thread. I'm having difficulty seeing how one can implement it with pause/resume. Would you elaborate?

Since those operations are per-partition, and when you stop a partition, it attempts to re-balance, I would need to stop all partitions. Even then, it would try to finish the current transactions instead of immediately putting it on hold and processing other topics. 

It also looks like in order to determine if I had received messages from the pri-1 topic, I would need to loop through all records, and ignore those that weren't pri-1 until a poll failed to retrieve any, which seems like it would screw up the other topics.

Thank you,

Nathan

Re: [VOTE] KIP-349 Priorities for Source Topics

Posted by Sönke Liebau <so...@opencore.com.INVALID>.
Hi Nathan,

I have a couple of remarks/questions about your mail, if I may.

First of all, the javadoc for the pause operation of KafkaConsumer states:
"Suspend fetching from the requested partitions. Future calls to
poll(Duration)
<https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration->
 will not return any records from these partitions until they have been
resumed using resume(Collection)
<https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection->.
Note that this method does not affect partition subscription. In
particular, it does not cause a group rebalance when automatic assignment
is used." [1]
You mentioned that "those operations" cause a rebalance, can you perhaps
elaborate on that some more?

Second, you state that "it would try to finish the current transactions",
which confuses me a little as well, since the consumer is not really aware
of transactions in a meaningful way. Or does "transaction" in this case
refer to your last call to poll()?

Have you looked into splitting your subscription across two consumers, one
for high priority topics, one for low(er) priority topics? Unless you are
looking for a dynamic, multi-tier priority system across many topics, that
might be your best bet. This works quite well for scenarios where you have
one topic that acts as a control plane (think start,stop processing type of
messages) and a second topic contains the actual data.

Best regards,
Sönke






[1]
https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#pause-java.util.Collection-


On Sun, Mar 24, 2019 at 2:41 AM nathankski@gmail.com <na...@gmail.com>
wrote:

>
>
> On 2019/01/28 02:26:31, nick@afshartous.com wrote:
> > Hi Sönke,
> >
> > Thanks for taking the time to review.  I’ve put KIP-349 into
> hibernation.
> >
> > Thanks also to everyone who participated in the discussion.
> >
> > Best regards,
> > --
> >       Nick
> >
> > > On Jan 25, 2019, at 5:51 AM, Sönke Liebau <so...@opencore.com.INVALID>
> wrote:
> > >
> > > a bit late to the party, sorry. I recently spent some time looking
> > > into this / a similar issue [1].
> > > After some investigation and playing around with settings I think that
> > > the benefit that could be gained from this is somewhat limited and
> > > probably outweighed by the implementation effort.
> > >
> > > The consumer internal are already geared towards treating partitions
> > > fairly so that no partition has to wait an undue amount of time and
> > > this can be further tuned for latency over throughput. Additionally,
> > > if this is a large issue for someone, there is always the option of
> > > having a dedicated consumer reading only from the control topic, which
> > > would mean that messages from that topic are received "immediately".
> > > For a Kafka Streams job it would probably make sense to create two
> > > input streams and then merging those as a first step.
> > >
> > > I think with these knobs a fairly large amount of flexibility can be
> > > achieved so that there is no urgent need to implement priorities.
> > >
> > > So my personal preference would be to set this KIP to dormant for now.
> >
> >
> >
> >
> >
> >
> Hello Nick,
>
> I'm extremely new to Kafka, but I was attempting to set up a per-topic
> priority application, and ended up finding this thread. I'm having
> difficulty seeing how one can implement it with pause/resume. Would you
> elaborate?
>
> Since those operations are per-partition, and when you stop a partition,
> it attempts to re-balance, I would need to stop all partitions. Even then,
> it would try to finish the current transactions instead of immediately
> putting it on hold and processing other topics.
>
> It also looks like in order to determine if I had received messages from
> the pri-1 topic, I would need to loop through all records, and ignore those
> that weren't pri-1 until a poll failed to retrieve any, which seems like it
> would screw up the other topics.
>
> Thank you,
>
> Nathan
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany

Re: [VOTE] KIP-349 Priorities for Source Topics

Posted by Sönke Liebau <so...@opencore.com.INVALID>.
Hi Colin,

that is definitely a good option and will cover 90% of all use cases
(probaby more).

However strictly speaking it only addresses one half of the issue unless I
am mistaken. The internal behavior of the KafkaConsumer (which partition
the fetcher gets data from next and which buffered data is returned on the
next poll) is not affected by this. So records will only "jump the queue"
once they leave the KafkaConsumer, until then they will need to fairly
queue just like the rest of the messages.
Again, this will be sufficient in most cases, but if you want high priority
messages to actually jump to the front of the queue you would probably want
to combine both approaches and have a consumer for high prio topics and one
for the rest, both feeding into the same prioritized queue.

Best regards,
Sönke

On Mon, Mar 25, 2019 at 5:43 AM Colin McCabe <cm...@apache.org> wrote:

> On Sat, Mar 23, 2019, at 18:41, nathankski@gmail.com wrote:
> >
> >
> > On 2019/01/28 02:26:31, nick@afshartous.com wrote:
> > > Hi Sönke,
> > >
> > > Thanks for taking the time to review.  I’ve put KIP-349 into
> hibernation.
> > >
> > > Thanks also to everyone who participated in the discussion.
> > >
> > > Best regards,
> > > --
> > >       Nick
> > >
> > > > On Jan 25, 2019, at 5:51 AM, Sönke Liebau <
> soenke.liebau@opencore.com.INVALID> wrote:
> > > >
> > > > a bit late to the party, sorry. I recently spent some time looking
> > > > into this / a similar issue [1].
> > > > After some investigation and playing around with settings I think
> that
> > > > the benefit that could be gained from this is somewhat limited and
> > > > probably outweighed by the implementation effort.
> > > >
> > > > The consumer internal are already geared towards treating partitions
> > > > fairly so that no partition has to wait an undue amount of time and
> > > > this can be further tuned for latency over throughput. Additionally,
> > > > if this is a large issue for someone, there is always the option of
> > > > having a dedicated consumer reading only from the control topic,
> which
> > > > would mean that messages from that topic are received "immediately".
> > > > For a Kafka Streams job it would probably make sense to create two
> > > > input streams and then merging those as a first step.
> > > >
> > > > I think with these knobs a fairly large amount of flexibility can be
> > > > achieved so that there is no urgent need to implement priorities.
> > > >
> > > > So my personal preference would be to set this KIP to dormant for
> now.
> > >
> > >
> > >
> > >
> > >
> > >
> > Hello Nick,
> >
> > I'm extremely new to Kafka, but I was attempting to set up a per-topic
> > priority application, and ended up finding this thread. I'm having
> > difficulty seeing how one can implement it with pause/resume. Would you
> > elaborate?
> >
> > Since those operations are per-partition, and when you stop a
> > partition, it attempts to re-balance, I would need to stop all
> > partitions. Even then, it would try to finish the current transactions
> > instead of immediately putting it on hold and processing other topics.
>
> Hi nathankski,
>
> Calling pause() on a partition doesn't trigger a re-balance or try to
> finish the current transactions.  It just means that you won't get more
> records for that partition until you call resume() on it.
>
> >
> > It also looks like in order to determine if I had received messages
> > from the pri-1 topic, I would need to loop through all records, and
> > ignore those that weren't pri-1 until a poll failed to retrieve any,
> > which seems like it would screw up the other topics.
>
> One way to do this would be to have two threads.  The first thread calls
> poll() on the Kafka consumer.  It puts the records it retrieves into a
> PriorityBlockingQueue.  Records from pri-1 have the priority within the
> queue.
>
> The second thread retrieves records from the queue.  pri-1 records will
> always be pulled out of the PriorityBlockingQueue ahead of any other
> records, so they will be processed first.
>
> If the priority queue gets too big, you pause partitions until thread 2
> can clear the backlog.  The low-priority partition is paused first.
>
> best,
> Colin
>
> >
> > Thank you,
> >
> > Nathan
> >
>


-- 
Sönke Liebau
Partner
Tel. +49 179 7940878
OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany

Re: [VOTE] KIP-349 Priorities for Source Topics

Posted by Colin McCabe <cm...@apache.org>.
On Sat, Mar 23, 2019, at 18:41, nathankski@gmail.com wrote:
> 
> 
> On 2019/01/28 02:26:31, nick@afshartous.com wrote: 
> > Hi Sönke,
> > 
> > Thanks for taking the time to review.  I’ve put KIP-349 into hibernation.  
> > 
> > Thanks also to everyone who participated in the discussion.
> > 
> > Best regards,
> > --
> >       Nick
> > 
> > > On Jan 25, 2019, at 5:51 AM, Sönke Liebau <so...@opencore.com.INVALID> wrote:
> > > 
> > > a bit late to the party, sorry. I recently spent some time looking
> > > into this / a similar issue [1].
> > > After some investigation and playing around with settings I think that
> > > the benefit that could be gained from this is somewhat limited and
> > > probably outweighed by the implementation effort.
> > > 
> > > The consumer internal are already geared towards treating partitions
> > > fairly so that no partition has to wait an undue amount of time and
> > > this can be further tuned for latency over throughput. Additionally,
> > > if this is a large issue for someone, there is always the option of
> > > having a dedicated consumer reading only from the control topic, which
> > > would mean that messages from that topic are received "immediately".
> > > For a Kafka Streams job it would probably make sense to create two
> > > input streams and then merging those as a first step.
> > > 
> > > I think with these knobs a fairly large amount of flexibility can be
> > > achieved so that there is no urgent need to implement priorities.
> > > 
> > > So my personal preference would be to set this KIP to dormant for now.
> > 
> > 
> > 
> > 
> > 
> > 
> Hello Nick,
> 
> I'm extremely new to Kafka, but I was attempting to set up a per-topic 
> priority application, and ended up finding this thread. I'm having 
> difficulty seeing how one can implement it with pause/resume. Would you 
> elaborate?
> 
> Since those operations are per-partition, and when you stop a 
> partition, it attempts to re-balance, I would need to stop all 
> partitions. Even then, it would try to finish the current transactions 
> instead of immediately putting it on hold and processing other topics. 

Hi nathankski,

Calling pause() on a partition doesn't trigger a re-balance or try to finish the current transactions.  It just means that you won't get more records for that partition until you call resume() on it.

> 
> It also looks like in order to determine if I had received messages 
> from the pri-1 topic, I would need to loop through all records, and 
> ignore those that weren't pri-1 until a poll failed to retrieve any, 
> which seems like it would screw up the other topics.

One way to do this would be to have two threads.  The first thread calls poll() on the Kafka consumer.  It puts the records it retrieves into a PriorityBlockingQueue.  Records from pri-1 have the priority within the queue.

The second thread retrieves records from the queue.  pri-1 records will always be pulled out of the PriorityBlockingQueue ahead of any other records, so they will be processed first.

If the priority queue gets too big, you pause partitions until thread 2 can clear the backlog.  The low-priority partition is paused first.

best,
Colin

> 
> Thank you,
> 
> Nathan
>