You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/08 23:39:33 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11738: KAFKA-12648: isolate tasks that hit processing errors

guozhangwang commented on a change in pull request #11738:
URL: https://github.com/apache/kafka/pull/11738#discussion_r802144978



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TaskExecutor {
+
+    private final Logger log;
+
+    private final Tasks tasks;
+
+
+    // tasks that hit an error are sent to the penalty box
+    private final SortedMap<TaskId, ErrorTask> errorTasks = new TreeMap<>();
+
+    public TaskExecutor(final Tasks tasks, final LogContext logContext) {
+        this.tasks = tasks;
+        this.log = logContext.logger(getClass());
+    }
+
+    Map<TaskId, ErrorTask> threadErrorTasks() {
+        return errorTasks;
+    }
+
+    public void registerTaskErrors(final Map<TaskId, ErrorTask> appErrorTasks) {
+        errorTasks.putAll(appErrorTasks);
+    }
+
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     * @throws StreamsException      if any task threw an exception while processing
+     */
+    int process(final int maxNumRecords, final Time time) {
+        int totalProcessed = 0;
+
+        // First process the healthy tasks
+        for (final Task task : tasks.activeTasks()) {
+            if (!errorTasks.containsKey(task.id())) {
+                try {
+                    totalProcessed += processTask(task, maxNumRecords, time);
+                } catch (final Throwable throwable) {
+                    log.info("Sending task {} to penalty box due to hitting error", task.id());
+                    final ErrorTask errorTask= new ErrorTask((StreamTask) task);

Review comment:
       nit: space before `=`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -71,6 +73,7 @@
     private final Map<String, StateStore> globalStateStores = new HashMap<>();
     private final Set<String> allInputTopics = new HashSet<>();
     private final Map<String, Long> threadVersions = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TaskId, ErrorTask> errorTasks = new ConcurrentHashMap<>();

Review comment:
       Not sure why we'd need to keep the error tasks mapping in two places and each is used to populate the other (see my other comment above). If the goal is to live beyond thread replacements then we don't we just keep it on the global level --- a.k.a. here, rather than within a thread's task executor as well? I.e. whenever we hit an exception we immediately call the topology metadata and add it here, so that we do not need the local mapping, and then for retrying the error tasks the thread would just try to loop over its assigned tasks' ids and see if any is in the mapping. This may be a bit less efficient but we avoid the complexity of bookkeeping in two places, a global mapping and a local per-thread mapping.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -349,6 +343,7 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
             throw first.getValue();
         }
 
+        taskExecutor.registerTaskErrors(topologyMetadata.applicationErrorTasks());

Review comment:
       We have two callers to register task errors, one is inside the task executor when we hit an exception and then immediately populate the map, which makes sense; and the other one is here that we register from the topology metadata, but the metadata's registered map is from the executor as well. So it seems to me that we have a circular call trace that populates of the error mappings, and not clear why.. Could you explain a bit? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -141,6 +144,22 @@ public void unregisterThread(final String threadName) {
         maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
     }
 
+    public void registerTaskErrors(final Map<TaskId, ErrorTask> threadErrorTasks) {
+        errorTasks.putAll(threadErrorTasks);
+    }
+
+    public void clearErrorStatusForTask(final TaskId taskId) {
+        errorTasks.remove(taskId);
+    }
+
+    public Map<TaskId, ErrorTask> applicationErrorTasks() {
+        return errorTasks;
+    }
+
+    public ErrorTask taskErrorState(final TaskId taskId) {

Review comment:
       This function is not used.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class TaskExecutor {
+
+    private final Logger log;
+
+    private final Tasks tasks;
+
+
+    // tasks that hit an error are sent to the penalty box
+    private final SortedMap<TaskId, ErrorTask> errorTasks = new TreeMap<>();
+
+    public TaskExecutor(final Tasks tasks, final LogContext logContext) {
+        this.tasks = tasks;
+        this.log = logContext.logger(getClass());
+    }
+
+    Map<TaskId, ErrorTask> threadErrorTasks() {
+        return errorTasks;
+    }
+
+    public void registerTaskErrors(final Map<TaskId, ErrorTask> appErrorTasks) {
+        errorTasks.putAll(appErrorTasks);
+    }
+
+    /**
+     * @throws TaskMigratedException if the task producer got fenced (EOS only)
+     * @throws StreamsException      if any task threw an exception while processing
+     */
+    int process(final int maxNumRecords, final Time time) {
+        int totalProcessed = 0;
+
+        // First process the healthy tasks
+        for (final Task task : tasks.activeTasks()) {
+            if (!errorTasks.containsKey(task.id())) {
+                try {
+                    totalProcessed += processTask(task, maxNumRecords, time);
+                } catch (final Throwable throwable) {
+                    log.info("Sending task {} to penalty box due to hitting error", task.id());
+                    final ErrorTask errorTask= new ErrorTask((StreamTask) task);
+                    errorTask.addError(throwable);
+                    errorTasks.put(task.id(), errorTask);
+                }
+            }
+        }
+
+        // Then process any error tasks that are ready to be retried
+        for (final ErrorTask task : errorTasks.values()) {
+            if (task.readyForRetry()) {
+                try {
+                    totalProcessed += processTask(task.streamTask, maxNumRecords, time);
+                    log.info("Removing task {} from penalty box after clean run", task.streamTask.id());
+                    tasks.topologyMetadata().clearErrorStatusForTask(task.streamTask.id());
+                } catch (final Throwable throwable) {
+                    log.debug("Keeping task {} to penalty box due to hitting error", task.streamTask.id());
+                }
+            }
+        }
+
+        return totalProcessed;
+    }
+
+    private long processTask(final Task task, final int maxNumRecords, final Time time) {
+        int processed = 0;
+        long now = time.milliseconds();
+
+        final long then = now;
+        try {
+            while (processed < maxNumRecords && task.process(now)) {
+                task.clearTaskTimeout();
+                processed++;
+            }
+        } catch (final TimeoutException timeoutException) {
+            task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+            log.debug(
+                String.format(
+                    "Could not complete processing records for %s due to the following exception; will move to next task and retry later",
+                    task.id()),
+                timeoutException
+            );
+        } catch (final TaskMigratedException e) {
+            log.info("Failed to process stream task {} since it got migrated to another thread already. " +
+                "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
+            throw e;
+        } catch (final StreamsException e) {
+            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            e.setTaskId(task.id());
+            throw e;
+        } catch (final RuntimeException e) {
+            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            throw new StreamsException(e, task.id());
+        } finally {
+            now = time.milliseconds();
+            task.recordProcessBatchTime(now - then);
+        }
+        return processed;
+    }
+
+    public boolean isInError(final TaskId taskId) {
+        return errorTasks.containsKey(taskId);
+    }
+
+    static class ErrorTask implements Comparable<ErrorTask> {
+
+        private final StreamTask streamTask;
+        private Queue<Throwable> errors = new LinkedList<>();
+
+        public ErrorTask(final StreamTask streamTask) {
+            this.streamTask = streamTask;
+        }
+
+        public boolean readyForRetry() {
+            // TODO: implement backoff for persisting errors
+            return true;
+        }
+
+        public void addError(final Throwable throwable) {

Review comment:
       The following functions are not used in this PR, assuming they will be used in future PRs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org