You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hongcha (Jira)" <ji...@apache.org> on 2022/10/18 02:31:00 UTC

[jira] [Updated] (FLINK-29674) Apache Kafka Connector‘s “ setBounded” not valid

     [ https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

hongcha updated FLINK-29674:
----------------------------
    Summary: Apache Kafka Connector‘s “ setBounded” not valid  (was: Apache Kafka 连接器的“ setBounded”不生效)

> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
>                 Key: FLINK-29674
>                 URL: https://issues.apache.org/jira/browse/FLINK-29674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: hongcha
>            Priority: Major
>
> When I'm using the official Apache Kafka connector, I see that the official means to set kafka's consumption boundary (" setBounded ") are provided, and when my job runs normally (with no bugs), the bounds are valid, and my job will finish consuming bounded data normally. However, when my job fails and I restore it to the checkpoint used during the failure, I find that my job cannot be completed normally and is always running. However, I can see in the log that data has been consumed to the threshold set by me. I don't know if there is a problem with my usage, here is part of my code:
>  
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setBootstrapServers("xxx:9092")
>         .setProperties(properties)
>         .setTopics(topicName)
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setBounded(OffsetsInitializer.offsets(offsets))
>         .build(); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)