You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wang xuchen <be...@gmail.com> on 2019/06/23 04:31:45 UTC

Does making synchronize call might choke the whole pipeline

Dear Flink experts,

I am testing the following code

        env.enableCheckpointing(2000);

        FlinkKafkaConsumer<String> consumer = new
FlinkKafkaConsumer<>("kf-events", new SimpleStringSchema(), properties2);

                ...


                messageStream.rebalance().map(new MapFunction<String,
String>() {

                       ...


                        @Override

                        public String map(String value) throws Exception {



                                long tid = Thread.currentThread().getId();

                                 doWork(tid);

                               ...

                        }

                }).setParallelism(2).print();

By using setParallelism(2), I expect to have two threads processing records
from the same Kafka partition. However, if one thread is choked in doWork,
the other thread can not make progress either. The consumer offset lagging
starts to build up as the result.

What I`d like to achieve is that, the 'healthy' thread continues to make
progress to some extend (not forever), even though the the choked thread is
holding the earlier offset from been committed, but the subsequent records
in the partition are processed.

If the offset that the unhealthy thread eventually aborts, it is OK to
reprocess  offsets again. For example, doWork does credit card fraud
checking, I don`t one bad transaction to hold off all customers` credit
card usage.

My question is:
1) should doWork be a sync call a bad practice?
2) Is there a parameters that I can tune the max uncommitted offset? This
will dictate how much offsets might be reprocessed.

Thanks a lot
Ben