You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "alfredo.vasquez.spglobal.com via user" <us...@flink.apache.org> on 2022/09/01 20:38:29 UTC

Kafka source stops consuming messages from topic after some minutes

Hello,

Im using flink-connector-kafka version 1.15.2 to consume messages from a kafka topic which has 3 partitions that later its connected to to another kafka source and then processed in a BroadcastProcessFunction.

The Kafka source is created as follows:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "600000");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000");

KafkaSource<String> kafkaSource = KafkaSource.<T>builder()
              .setBootstrapServers("localhost:9092")
              .setTopics("mytopic")
              .setGroupId("group-id")
              .setClientIdPrefix("client-id")
              .setStartingOffsets(OffsetsInitializer.latest())
              .setProperty("security.protocol", "SSL")
              .setProperty("partition.discovery.interval.ms", "300000")
              .setProperties(properties)
              .setDeserializer(new StringDeserializationSchema())
    .build();

DataStreamSource<String> myStreamSource =
              env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "myStreamSource");


Then I start sending 10 messages per second to the topic and notice that the consumer starts reading messages but after some minutes the consumer stops to read messages from the topic, for example if I send 3000 messages to the topic only around 1200 or 2000 are consumed.
I do not get any exception or error message in the task manager logs, the job does not restart and the backpressure its around 15 to 20% when its reading messages and then drops to 0%

Please let me know any suggestion or additional information required to fix this issue.

Best.

________________________________

The information contained in this message is intended only for the recipient, and may be a confidential attorney-client communication or may otherwise be privileged and confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, please be aware that any dissemination or copying of this communication is strictly prohibited. If you have received this communication in error, please immediately notify us by replying to the message and deleting it from your computer. S&P Global Inc. reserves the right, subject to applicable local law, to monitor, review and process the content of any electronic message or information sent to or from S&P Global Inc. e-mail addresses without informing the sender or recipient of the message. By sending electronic message or information to S&P Global Inc. e-mail addresses you, as the sender, are consenting to S&P Global Inc. processing any of your personal data therein.

RE: Kafka source stops consuming messages from topic after some minutes

Posted by "alfredo.vasquez.spglobal.com via user" <us...@flink.apache.org>.
Hello, Thank you for your response,
Just updating on this issue, this was not an issue on the flink job but I found out that was related to this fluentd issue (https://github.com/fluent/fluentd/issues/3614) that’s why I was not getting all logs expected,
Checking the output kafka topic I see all messages correctly processed.

Regards,

From: Martijn Visser <ma...@apache.org>
Sent: Friday, September 2, 2022 4:54 AM
To: Vasquez, Alfredo <al...@spglobal.com>
Cc: user@flink.apache.org
Subject: Re: Kafka source stops consuming messages from topic after some minutes

EXTERNAL MESSAGE


My initial thought is that there's something in your business logic. You're reading from one Kafka topic, then you're mentioning that it's "connected" to another Kafka topic. What type of business logic are you executing? Are you joining data, looking things up etc? My suspicion would be that in this process there's an issue which causes that operator to not progress as quickly, causing the source to pause/stop reading.

Op do 1 sep. 2022 om 22:40 schreef alfredo.vasquez.spglobal.com<http://alfredo.vasquez.spglobal.com> via user <us...@flink.apache.org>>:
Hello,

Im using flink-connector-kafka version 1.15.2 to consume messages from a kafka topic which has 3 partitions that later its connected to to another kafka source and then processed in a BroadcastProcessFunction.

The Kafka source is created as follows:

Properties properties = new Properties();
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "600000");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000");

KafkaSource<String> kafkaSource = KafkaSource.<T>builder()
              .setBootstrapServers("localhost:9092")
              .setTopics("mytopic")
              .setGroupId("group-id")
              .setClientIdPrefix("client-id")
              .setStartingOffsets(OffsetsInitializer.latest())
              .setProperty("security.protocol", "SSL")
              .setProperty("partition.discovery.interval.ms<http://partition.discovery.interval.ms>", "300000")
              .setProperties(properties)
              .setDeserializer(new StringDeserializationSchema())
    .build();

DataStreamSource<String> myStreamSource =
              env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "myStreamSource");


Then I start sending 10 messages per second to the topic and notice that the consumer starts reading messages but after some minutes the consumer stops to read messages from the topic, for example if I send 3000 messages to the topic only around 1200 or 2000 are consumed.
I do not get any exception or error message in the task manager logs, the job does not restart and the backpressure its around 15 to 20% when its reading messages and then drops to 0%

Please let me know any suggestion or additional information required to fix this issue.

Best.

________________________________

The information contained in this message is intended only for the recipient, and may be a confidential attorney-client communication or may otherwise be privileged and confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, please be aware that any dissemination or copying of this communication is strictly prohibited. If you have received this communication in error, please immediately notify us by replying to the message and deleting it from your computer. S&P Global Inc. reserves the right, subject to applicable local law, to monitor, review and process the content of any electronic message or information sent to or from S&P Global Inc. e-mail addresses without informing the sender or recipient of the message. By sending electronic message or information to S&P Global Inc. e-mail addresses you, as the sender, are consenting to S&P Global Inc. processing any of your personal data therein.

Re: Kafka source stops consuming messages from topic after some minutes

Posted by Martijn Visser <ma...@apache.org>.
My initial thought is that there's something in your business logic. You're
reading from one Kafka topic, then you're mentioning that it's "connected"
to another Kafka topic. What type of business logic are you executing? Are
you joining data, looking things up etc? My suspicion would be that in this
process there's an issue which causes that operator to not progress as
quickly, causing the source to pause/stop reading.

Op do 1 sep. 2022 om 22:40 schreef alfredo.vasquez.spglobal.com via user <
user@flink.apache.org>:

> Hello,
>
>
>
> Im using flink-connector-kafka version 1.15.2 to consume messages from a
> kafka topic which has 3 partitions that later its connected to to another
> kafka source and then processed in a BroadcastProcessFunction.
>
>
>
> The Kafka source is created as follows:
>
>
>
> Properties properties = new Properties();
>
> properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "600000");
>
> properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
> "1000");
>
> properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
>
> properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> "900000");
>
>
>
> KafkaSource<String> kafkaSource = KafkaSource.<T>builder()
>
>               .setBootstrapServers("localhost:9092")
>
>               .setTopics("mytopic")
>
>               .setGroupId("group-id")
>
>               .setClientIdPrefix("client-id")
>
>               .setStartingOffsets(OffsetsInitializer.latest())
>
>               .setProperty("security.protocol", "SSL")
>
>               .setProperty("partition.discovery.interval.ms", "300000")
>
>               .setProperties(properties)
>
>               .setDeserializer(new StringDeserializationSchema())
>
>     .build();
>
>
>
> DataStreamSource<String> myStreamSource =
>
>               env.fromSource(kafkaSource,
> WatermarkStrategy.noWatermarks(), "myStreamSource");
>
>
>
>
>
> Then I start sending 10 messages per second to the topic and notice that
> the consumer starts reading messages but after some minutes the consumer
> stops to read messages from the topic, for example if I send 3000 messages
> to the topic only around 1200 or 2000 are consumed.
>
> I do not get any exception or error message in the task manager logs, the
> job does not restart and the backpressure its around 15 to 20% when its
> reading messages and then drops to 0%
>
>
>
> Please let me know any suggestion or additional information required to
> fix this issue.
>
>
>
> Best.
>
> ------------------------------
>
> The information contained in this message is intended only for the
> recipient, and may be a confidential attorney-client communication or may
> otherwise be privileged and confidential and protected from disclosure. If
> the reader of this message is not the intended recipient, or an employee or
> agent responsible for delivering this message to the intended recipient,
> please be aware that any dissemination or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please immediately notify us by replying to the message and deleting it
> from your computer. S&P Global Inc. reserves the right, subject to
> applicable local law, to monitor, review and process the content of any
> electronic message or information sent to or from S&P Global Inc. e-mail
> addresses without informing the sender or recipient of the message. By
> sending electronic message or information to S&P Global Inc. e-mail
> addresses you, as the sender, are consenting to S&P Global Inc. processing
> any of your personal data therein.
>