You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/08 22:33:10 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running

vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451860750



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -766,6 +769,24 @@ void runOnce() {
         return records;
     }
 
+    private OffsetResetStrategy getResetStrategy(final TopicPartition partition) {
+        if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+            return OffsetResetStrategy.EARLIEST;
+        } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+            return OffsetResetStrategy.LATEST;
+        } else {
+            if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
+                return OffsetResetStrategy.EARLIEST;
+            }

Review comment:
       I honestly couldn't figure out what is the default default default reset strategy... It seems (from the behavior of the test when we first start up) that if there's no strategy set, and no committed offset, then the client starts at the beginning, but the ClientConfig has the default policy as "latest"... What gives?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), swallow);
             }
             task.closeDirty();
+            // Pause so we won't poll any more records for this task until it has been re-initialized
+            // Note, closeDirty already clears the partitiongroup for the task.
+            mainConsumer().pause(task.inputPartitions());
+            final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(task.inputPartitions());
+            for (final TopicPartition topicPartition : task.inputPartitions()) {
+                final OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
+                if (offsetAndMetadata == null) {
+                    final OffsetResetStrategy strategy = resetStrategy.apply(topicPartition);
+                    switch (strategy) {
+                        case EARLIEST:
+                            mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        case LATEST:
+                            mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        default:
+                            throw new IllegalArgumentException("Unexpected reset strategy: " + strategy);
+                    }
+                } else {
+                    mainConsumer().seek(topicPartition, offsetAndMetadata);
+                }
+            }

Review comment:
       This might be the worst thing I've ever proposed for AK... I can't figure out a better way to just "reset" the offset.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org