You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Qingsheng Ren (Jira)" <ji...@apache.org> on 2022/07/15 02:58:00 UTC
[jira] [Assigned] (FLINK-28185) "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
[ https://issues.apache.org/jira/browse/FLINK-28185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Qingsheng Ren reassigned FLINK-28185:
-------------------------------------
Assignee: Mason Chen
> "Invalid negative offset" when using OffsetsInitializer.timestamp(.)
> --------------------------------------------------------------------
>
> Key: FLINK-28185
> URL: https://issues.apache.org/jira/browse/FLINK-28185
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.15.0
> Environment: Flink 1.15.0
> Kafka 2.8.1
> Reporter: Peter Schrott
> Assignee: Mason Chen
> Priority: Minor
> Attachments: Bildschirmfoto 2022-06-21 um 15.24.58-1.png, NegativeOffsetSpec.scala
>
>
> When using the {{OffsetsInitializer.timestamp(.)}} on a topic with empty partitions – little traffice + low retention – an {{IllegalArgumentException: Invalid negative offset}} occures. See stracktrace below.
> The problem here is, that the admin client returns -1 as timestamps and offset for empty partitions in {{{}KafkaAdminClient.listOffsets(.){}}}. [1] Please compare the attached screenshot. When creating {{OffsetAndTimestamp}} object from the admin client response the exception is thrown.
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition splits due to
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
> at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
> at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> at java.util.concurrent.FutureTask.run(FutureTask.java)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
> at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:36)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
> at org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
> at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
> ... 8 common frames omitted
> 15:25:58.025 INFO [flink-akka.actor.default-dispatcher-11] o.a.f.runtime.jobmaster.JobMaster - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: XXX -> YYY -> Sink: ZZZ' (operator 351e440289835f2ff3e6fee31bf6e13c).
> at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
> at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
> at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
> at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:329)
> at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
> at java.util.concurrent.FutureTask.run(FutureTask.java)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to initialize partition splits due to
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:299)
> at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
> at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
> ... 8 common frames omitted
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
> at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:36)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1723)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
> at org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.initializePartitionSplits(KafkaSourceEnumerator.java:272)
> at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.lambda$checkPartitionChanges$0(KafkaSourceEnumerator.java:242)
> at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
> ... 8 common frames omitted {code}
> *Expected Result:*
> Consumer is initialized and records of partitions that contain data (> given timestamp) are consumed. Newly incomming data on "empty" partitions are also consumed.
> *Actual Result:*
> Consumer is not initizalied. No data are consumed.
>
> [1] [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L604]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)