You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/17 21:41:43 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11738: MINOR: move non-management methods from TaskManager to Task Executor

wcarlson5 commented on a change in pull request #11738:
URL: https://github.com/apache/kafka/pull/11738#discussion_r809488943



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.CommitFailedException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
+import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
+
+/**
+ * Single-threaded executor class for the active tasks assigned to this thread.
+ */
+public class TaskExecutor {
+
+    private final Logger log;
+
+    private final ProcessingMode processingMode;
+    private final Tasks tasks;
+
+    public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, final LogContext logContext) {
+        this.tasks = tasks;
+        this.processingMode = processingMode;
+        this.log = logContext.logger(getClass());
+    }
+
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     * @throws StreamsException      if any task threw an exception while processing
+     */
+    int process(final int maxNumRecords, final Time time) {
+        int totalProcessed = 0;
+
+        for (final Task task : tasks.activeTasks()) {
+            totalProcessed += processTask(task, maxNumRecords, time);
+        }
+
+        return totalProcessed;
+    }
+
+    private long processTask(final Task task, final int maxNumRecords, final Time time) {
+        int processed = 0;
+        long now = time.milliseconds();
+
+        final long then = now;
+        try {
+            while (processed < maxNumRecords && task.process(now)) {
+                task.clearTaskTimeout();
+                processed++;
+            }
+        } catch (final TimeoutException timeoutException) {
+            task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+            log.debug(
+                String.format(
+                    "Could not complete processing records for %s due to the following exception; will move to next task and retry later",
+                    task.id()),
+                timeoutException
+            );
+        } catch (final TaskMigratedException e) {
+            log.info("Failed to process stream task {} since it got migrated to another thread already. " +
+                "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
+            throw e;
+        } catch (final StreamsException e) {
+            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            e.setTaskId(task.id());
+            throw e;
+        } catch (final RuntimeException e) {
+            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            throw new StreamsException(e, task.id());
+        } finally {
+            now = time.milliseconds();
+            task.recordProcessBatchTime(now - then);
+        }
+        return processed;
+    }
+
+    /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
+     * @param consumedOffsetsAndMetadata an empty map that will be filled in with the prepared offsets
+     * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
+     */
+    int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
+                                                            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {

Review comment:
       nit: alignment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org