You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/09 00:27:39 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Skip processing if task isn't running

mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r451895176



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -193,6 +197,28 @@ private void closeAndRevive(final Map<Task, Collection<TopicPartition>> taskWith
                 log.error("Error suspending corrupted task {} ", task.id(), swallow);
             }
             task.closeDirty();
+            // Pause so we won't poll any more records for this task until it has been re-initialized
+            // Note, closeDirty already clears the partitiongroup for the task.
+            mainConsumer().pause(task.inputPartitions());
+            final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(task.inputPartitions());
+            for (final TopicPartition topicPartition : task.inputPartitions()) {
+                final OffsetAndMetadata offsetAndMetadata = committed.get(topicPartition);
+                if (offsetAndMetadata == null) {
+                    final OffsetResetStrategy strategy = resetStrategy.apply(topicPartition);
+                    switch (strategy) {
+                        case EARLIEST:
+                            mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        case LATEST:
+                            mainConsumer().seekToBeginning(Collections.singleton(topicPartition));
+                            break;
+                        default:
+                            throw new IllegalArgumentException("Unexpected reset strategy: " + strategy);
+                    }
+                } else {
+                    mainConsumer().seek(topicPartition, offsetAndMetadata);
+                }
+            }

Review comment:
       Why do you think is bad? That is just how the API works... Cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L769-L802 that does the same thing.
   
   What make we wonder, if we can share common code for both cases?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -137,7 +138,7 @@
         STARTING(2, 3, 5),                // 1
         PARTITIONS_REVOKED(2, 3, 5),      // 2
         PARTITIONS_ASSIGNED(2, 3, 4, 5),  // 3
-        RUNNING(2, 3, 5),                 // 4
+        RUNNING(2, 3, 4, 5),              // 4

Review comment:
       Why do we need this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -766,6 +769,24 @@ void runOnce() {
         return records;
     }
 
+    private OffsetResetStrategy getResetStrategy(final TopicPartition partition) {
+        if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+            return OffsetResetStrategy.EARLIEST;
+        } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+            return OffsetResetStrategy.LATEST;
+        } else {
+            if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
+                return OffsetResetStrategy.EARLIEST;
+            }

Review comment:
       > Does Streams override the client default..?
   
   Yes. The client default is "latest" but we use "earliest" by default (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L857). Of course, users can also change the default via StreamsConfig.
   
   Note that the consumer client can only apply a single strategy to all topics it subscribed to. Hence, if all topics use the same reset policy, we can rely on the consumer configures policy. However, if users specify different reset policies in their code via `Consumed` for individual topics, the consumer is re-configured to use "none" (cf. https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L362-L366) and we do a manual seekToBeginning/seekToEnd according to the user define strategy for the corresponding topic (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L762-L764) because we need to make a per-topic decision that the consumer cannot make for us.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -766,6 +769,24 @@ void runOnce() {
         return records;
     }
 
+    private OffsetResetStrategy getResetStrategy(final TopicPartition partition) {

Review comment:
       Wondering if we should reuse this method within `StreamThread#resetInvalidOffsets`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1140,4 +1166,12 @@ public static void executeAndMaybeSwallow(final boolean clean,
             throw e; },
             e -> log.debug("Ignoring error in unclean {}", name));
     }
+
+    boolean hasPreRunningTasks() {
+        return tasks().values().stream().anyMatch(Task::preRunning);
+    }
+
+    public void setResetStrategy(final Function<TopicPartition, OffsetResetStrategy> resetStrategy) {

Review comment:
       I think the name of the method is not ideal. We don't set the strategy, but we set a function that can compute the strategy for a partitions. Needed to go forth and back on the PR to understand how it work, and assume the method name had its part in confusing me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org