You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wang xuchen <be...@gmail.com> on 2019/06/21 03:48:29 UTC

Flink Kafka consumer with low latency requirement

Dear Flink experts,

I am experimenting Flink for a use case where there is a tight latency
requirements.

A stackoverflow article suggests that I can use setParallism(n) to process
a Kafka partition in a multi-threaded way. My understanding is there is
still one kafka consumer per partition, but by using setParallelism, I can
spin up multiple worker threads to process the messages read from the
consumer.

And according to Fabian`s comments in this link:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html
Flink is able to manage the offset correctly (commit in the right order).

Here is my questions, let`s say there is a Kafka topic with only one
partition, and I setup a consumer with setParallism(2). Hypothetically,
worker threads call out to a REST service which may get slow or stuck
periodically. If I want to make sure that the consumer overall is making
progress even in face of a 'slow woker'. In other words, I`d like to have
multiple pending but uncommitted offsets by the fast worker even when the
other worker is stuck. Is there such a knob  to tune in Flink?

From my own experiment, I use Kafka consume group tool to to monitor the
offset lag,  soon as one worker thread is stuck, the other cannot make any
progress either. I really want the fast worker still progress to certain
extend. For this use case, exactly once processing is not required.

Thanks for helping.
Ben

Re: Flink Kafka consumer with low latency requirement

Posted by xwang355 <be...@gmail.com>.
I posted my related observation here in a separated thread.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-making-synchronize-call-might-choke-the-whole-pipeline-tc28383.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Kafka consumer with low latency requirement

Posted by xwang355 <be...@gmail.com>.
private static void doWork(long tid) throws InterruptedException
{	
		if (!sortedTid.contains(tid)) {
			sortedTid.add(tid);
		}

	       // simulate a straggler, make the thread with the lowest tid a slow
processor
		if (sortedTid.first() == tid) {
			if (counter++ == 1000){
				Thread.sleep(60,000);
			}

			Thread.sleep(20);
		} else {
			Thread.sleep(20);
		}
}

Just for testing purpose, the thread with the lowest tid sleeps 60s when the
counter reaches 1000. Will 'sleep' causes any issues?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Kafka consumer with low latency requirement

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

What kind of function do you use to implement the operator that has the
blocking call?
Did you have a look at the AsyncIO operator? It was designed for exactly
such use cases.
It issues multiple asynchronous requests to an external service and waits
for the response.

Best, Fabian

Am Mo., 24. Juni 2019 um 17:01 Uhr schrieb xwang355 <be...@gmail.com>:

> Fabian,
>
> Does the above stack trace looks like a deadlock?
>
>         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539)
>
>         - locked <0x00000007baf84040> (a java.util.ArrayDeque)
>         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Flink Kafka consumer with low latency requirement

Posted by xwang355 <be...@gmail.com>.
Fabian,

Does the above stack trace looks like a deadlock?

        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:539) 
        - locked <0x00000007baf84040> (a java.util.ArrayDeque) 
        at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Kafka consumer with low latency requirement

Posted by xwang355 <be...@gmail.com>.
Fabian,

Thank you for replying.

If I understand your previous comment correctly, I setup up a consumer with
parallelism 1 and connect a worker task with parallelism 2. 

If worker thread one is making a block call and stuck for 60s, the consumer
thread should continue fetching from the partition and feeding thread two.
From my reading of Flink documentation, if checkpointing is enabled, the
consumer should commit its own internal state back to Kafka to show progress
to external monitoring tool. 

If that`s the case, during the 60s when thread one is stuck, checkpoint
should all succeed, thread two continuing chucking along merrily. Even
though the highest offset committed is the one less than the offset hold by
thread 1. After 60s, I should see a huge jump from the monitoring tool due
to the fact the thread 1 has released the offset and all offsets consumed by
thread 2 during the 60s can be committed.

However, what I have observed is that the as soon as thread one get stuck,
checkpointing is choked, consumer thread stopped feeding thread two and the
whole pipeline became stagnant.

Could you please help me understand this behavior.

Thanks again.
Ben 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Kafka consumer with low latency requirement

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ben,

Flink's Kafka consumers track their progress independent of any worker.
They keep track of the reading offset for themselves (committing progress
to Kafka is optional and only necessary to have progress monitoring in
Kafka's metrics).
As soon as a consumer reads and forwards an event, it is considered to be
read. This means, the progress of the downstream worker does not influence
the progress tracking at all.

In case of a topic with a single partition, you can use a consumer with
parallelism 1 and connect a worker task with a higher parallelism to it.
The single consumer task will send the read events round-robin to the
worker tasks.

Best, Fabian

Am Fr., 21. Juni 2019 um 05:48 Uhr schrieb wang xuchen <be...@gmail.com>:

>
> Dear Flink experts,
>
> I am experimenting Flink for a use case where there is a tight latency
> requirements.
>
> A stackoverflow article suggests that I can use setParallism(n) to process
> a Kafka partition in a multi-threaded way. My understanding is there is
> still one kafka consumer per partition, but by using setParallelism, I can
> spin up multiple worker threads to process the messages read from the
> consumer.
>
> And according to Fabian`s comments in this link:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Does-Flink-Kafka-connector-has-max-pending-offsets-concept-td28119.html
> Flink is able to manage the offset correctly (commit in the right order).
>
> Here is my questions, let`s say there is a Kafka topic with only one
> partition, and I setup a consumer with setParallism(2). Hypothetically,
> worker threads call out to a REST service which may get slow or stuck
> periodically. If I want to make sure that the consumer overall is making
> progress even in face of a 'slow woker'. In other words, I`d like to have
> multiple pending but uncommitted offsets by the fast worker even when the
> other worker is stuck. Is there such a knob  to tune in Flink?
>
> From my own experiment, I use Kafka consume group tool to to monitor the
> offset lag,  soon as one worker thread is stuck, the other cannot make any
> progress either. I really want the fast worker still progress to certain
> extend. For this use case, exactly once processing is not required.
>
> Thanks for helping.
> Ben
>
>
>