You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/24 21:42:26 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

mjsax commented on a change in pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#discussion_r460300664



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -291,27 +290,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
-                try {
-                    final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
-                    final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>();
-                    for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) {
-                        if (record.key() != null) {
-                            restoreRecords.add(recordConverter.convert(record));
-                        }
+                final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
+                final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>();
+                for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) {
+                    if (record.key() != null) {
+                        restoreRecords.add(recordConverter.convert(record));
                     }
-                    offset = globalConsumer.position(topicPartition);
-                    stateRestoreAdapter.restoreBatch(restoreRecords);
-                    stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
-                    restoreCount += restoreRecords.size();
-                } catch (final InvalidOffsetException recoverableException) {

Review comment:
       This is the actual bug: we swallow the exception. However, because we don't do any "seek", we just hit the same exception in `poll()` over and over and never recover but loop forever.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org