You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Paul (Jira)" <ji...@apache.org> on 2022/11/11 08:33:00 UTC

[jira] [Updated] (FLINK-29674) Apache Kafka Connector‘s “ setBounded” not valid

     [ https://issues.apache.org/jira/browse/FLINK-29674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fabian Paul updated FLINK-29674:
--------------------------------
    Affects Version/s: 1.15.3
                           (was: 1.15.2)

> Apache Kafka Connector‘s “ setBounded” not valid
> ------------------------------------------------
>
>                 Key: FLINK-29674
>                 URL: https://issues.apache.org/jira/browse/FLINK-29674
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.3
>            Reporter: Wei Jiang
>            Priority: Major
>         Attachments: image-2022-10-18-20-38-34-515.png
>
>
> When I'm using the Kafka connector, and to set kafka's consumption boundary (" setBounded ") 。when my job runs normally (with no fail), the bounds are valid, and my job will finish. However, when my job fails and I restore it to the checkpoint used during the failure, I find that my job cannot be completed normally and is always running. However, I can see in the log that data has been consumed to the boundary set by me. I don't know if there is a problem with my usage, here is part of my code:
>  
> {code:java}
> //代码占位符
> String topicName = "jw-test-kafka-w-offset-002";
> Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
> offsets.put(new TopicPartition(topicName,0), 6L);
> KafkaSource<String> source = KafkaSource.<String>builder()
>         .setBootstrapServers("xxx:9092")
>         .setProperties(properties)
>         .setTopics(topicName)
>         .setGroupId("my-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())
>         .setValueOnlyDeserializer(new SimpleStringSchema())
>         .setBounded(OffsetsInitializer.offsets(offsets))
>         .build(); {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)