You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by TechnoMage <ml...@technomage.com> on 2018/08/17 00:59:44 UTC
Kafka connector issue
I have seen this in the past and running into it again.
I have a kafka consumer that is not getting all the records from the topic. Kafka conforms there are 300k messages in each partition, and flink only sees a total of 8000 records in the source.
Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011
Properties props = new Properties();
props.setProperty("bootstrap.servers", servers);
props.setProperty("group.id", UUID.randomUUID().toString());
props.setProperty("flink.partition-discovery.interval-millis", "10000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DataStream<String> ds = consumers.get(a.eventType);
if (ds == null) {
FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
topic, new SimpleStringSchema(), props);
cons.setStartFromEarliest();
ds = env.addSource(cons).name(et.name).rebalance();
consumers.put(a.eventType, ds);
}
I am about to rip out the kafka consumer and build a source using the client library which has been 100% reliable in working with kafka. Any pointers welcome.
Michael
Re: Kafka connector issue
Posted by TechnoMage <ml...@technomage.com>.
It looks like it is some issue with backpressure as the same behavior happens with the client library as a custom source.
Michael
> On Aug 16, 2018, at 6:59 PM, TechnoMage <ml...@technomage.com> wrote:
>
> I have seen this in the past and running into it again.
>
> I have a kafka consumer that is not getting all the records from the topic. Kafka conforms there are 300k messages in each partition, and flink only sees a total of 8000 records in the source.
>
> Kafka is 2.0, flink is 1.4.2 connector is FlinkKafkaConsumer011
>
> Properties props = new Properties();
> props.setProperty("bootstrap.servers", servers);
> props.setProperty("group.id", UUID.randomUUID().toString());
> props.setProperty("flink.partition-discovery.interval-millis", "10000");
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> DataStream<String> ds = consumers.get(a.eventType);
> if (ds == null) {
> FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
> topic, new SimpleStringSchema(), props);
> cons.setStartFromEarliest();
> ds = env.addSource(cons).name(et.name).rebalance();
> consumers.put(a.eventType, ds);
> }
>
> I am about to rip out the kafka consumer and build a source using the client library which has been 100% reliable in working with kafka. Any pointers welcome.
>
> Michael
>