You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Danny Cranmer (Jira)" <ji...@apache.org> on 2022/10/10 13:02:00 UTC

[jira] [Commented] (FLINK-29395) [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard

    [ https://issues.apache.org/jira/browse/FLINK-29395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615105#comment-17615105 ] 

Danny Cranmer commented on FLINK-29395:
---------------------------------------

Merged commit [{{ef93ae4}}|https://github.com/apache/flink/commit/ef93ae4525ea42a87baf77747dd3ffbc007112fd] into master
Merged commit [{{7a1dccd}}|https://github.com/apache/flink/commit/7a1dccd3020ffefe83f1fa80ab70bc3150144640] into release-1.16 

 

> [Kinesis][EFO] Issue using EFO consumer at timestamp with empty shard
> ---------------------------------------------------------------------
>
>                 Key: FLINK-29395
>                 URL: https://issues.apache.org/jira/browse/FLINK-29395
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: 1.12.7, 1.13.6, 1.14.5, 1.15.2
>            Reporter: Hong Liang Teoh
>            Assignee: Hong Liang Teoh
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> *Background*
> The consumer fails when an EFO record publisher uses a timestamp sentinel starting position and the first record batch is empty. This is because the consumer tries to recalculate the start position from the timestamp sentinel, this operation is not supported.
> This is the same issue as https://issues.apache.org/jira/browse/FLINK-20088
> *Reproduction Steps*
> Setup an application consuming from Kinesis with following properties and consume from an empty shard:
> {code:java}
> String format = "yyyy-MM-dd'T'HH:mm:ss";
> String date = new SimpleDateFormat(format).format(new Date());
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, date);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, format);
> consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); 
> consumerConfig.setProperty(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, "EFO"); {code}
> *Error*
> {code:java}
> java.lang.IllegalArgumentException: Unexpected sentinel type: AT_TIMESTAMP_SEQUENCE_NUM
> 	at org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:115)
> 	at org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.fromSequenceNumber(StartingPosition.java:91)
> 	at org.apache.flink.streaming.connectors.kinesis.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:72)
> 	at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
> 	at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:356)
> 	at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:188)
> 	at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:154)
> 	at 
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:123)
> 	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
> 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 	at java.base/java.lang.Thread.run(Thread.java:829) {code}
>  
> *Solution*
> This is fixed by reusing the existing timestamp starting position in this condition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)