You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/17 11:58:05 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #11787: KAFKA-12738: tracking task errors

ableegoldman opened a new pull request #11787:
URL: https://github.com/apache/kafka/pull/11787


   Note: this is just part 1 of the error handling work, and is intended to lay the groundwork for identifying and handling tasks that are unstable so that they don't affect the ability of the other named topologies to make progress.
   
   For some idea of what exactly these changes are needed to support, the current plan is for later PRs to tackle improvements/optimizations such as
   
   1. doing an iteration without the error task so that the healthy tasks can be committed (necessary for eos-v2, for which you can't commit/abort individual partitions of a transaction)
   2. implementing true backoff for tasks experiencing frequent or constant errors
   3. error categorization (and possibly other heuristics) to enable classifying exceptions as "retriable" vs "thread fatal", allowing us to optimize the impact of task errors by skipping the thread replacement when the blast radius is contained and the thread state is not corrupted (for example, we currently apply this optimization already for the specific case of a MissingSourceTopicException, since there may be other named topologies that aren't missing any topics and can be processed as usual)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r812575798



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.UnknownTopologyException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private Logger log;
+
+    private final boolean hasNamedTopologies;
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+        allTopologyNames.forEach(name -> topologyNameToMetadata.put(name, new NamedTopologyMetadata(name)));
+    }
+
+    public void setLog(final LogContext logContext) {
+        log = logContext.logger(getClass());
+    }
+
+    public boolean canProcessTopology(final String topologyName) {
+        if (!hasNamedTopologies) {

Review comment:
       See [this](https://github.com/apache/kafka/pull/11787/files#diff-60815d69ce39481b546def9314419abcd040694f75357a46e9dd43af5fc15521R102-R104) -- we do check to make sure that if named topologies are used, then the `topologyName` is _not_ `null` or `UNNAMED_TOPOLOGY` -- however for apps that do not use named topologies I think it's fair to just return "true" here and not require any assertions about the name, since it's a dummy name in that case anyways.
   
   SG?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r813714182



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final String testId = safeUniqueTestName(getClass(), testName);
+    private final String appId = "appId_" + testId;
+    private final Properties properties = props();
+
+    // Task 0
+    private final String inputTopic = "input" + testId;
+    private final String outputTopic = "output" + testId;
+    // Task 1
+    private final String errorInputTopic = "error-input" + testId;
+    private final String errorOutputTopic = "error-output" + testId;
+
+    @Before
+    public void setup() {
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
+    }
+
+    private Properties props() {
+        return mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000),
+                mkEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000)
+            )
+        );
+    }
+
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {

Review comment:
       Would like to add a test for the case of tasks in different named topologies, but perhaps in a followup PR to unblock this one 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815264270



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private final boolean hasNamedTopologies;
+    // map of topologies experiencing errors/currently under backoff
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+    }
+
+    public boolean canProcessTask(final Task task, final long now) {
+        final String topologyName = task.id().topologyName();
+        if (!hasNamedTopologies) {
+            // TODO implement error handling/backoff for non-named topologies (needs KIP)
+            return true;
+        } else {
+            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+        }
+    }
+
+    public void registerTaskError(final Task task, final Throwable t, final long now) {
+        if (hasNamedTopologies) {
+            final String topologyName = task.id().topologyName();
+            topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
+                .registerTaskError(task, t, now);
+        }
+    }
+
+    class NamedTopologyMetadata {
+        private final Logger log;
+        private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<>();
+
+        public NamedTopologyMetadata(final String topologyName) {
+            final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
+            this.log = logContext.logger(NamedTopologyMetadata.class);
+        }
+
+        public boolean canProcess() {
+            // TODO: during long task backoffs, pause the full topology to avoid it getting out of sync
+            return true;
+        }
+
+        public boolean canProcessTask(final Task task, final long now) {
+            // TODO: implement exponential backoff, for now we just wait 15s
+            final Long errorTime = tasksToErrorTime.get(task.id());
+            if (errorTime == null) {
+                return true;
+            } else if (now - errorTime > 15000L) {

Review comment:
       Because it was actually taking the thread 10s to come back up (in the integration test where we overrode `session.timeout` to 10s) before we had https://github.com/apache/kafka/pull/11801
   
   Now with that fix it takes .5 - 4s for the thread to be replaced, so there's no particular reason to have it be 15s. I think it makes sense to lower it to maybe 5s for now, and then when we have the true exponential backoff obviously it can start lower and grow from there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815264270



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private final boolean hasNamedTopologies;
+    // map of topologies experiencing errors/currently under backoff
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+    }
+
+    public boolean canProcessTask(final Task task, final long now) {
+        final String topologyName = task.id().topologyName();
+        if (!hasNamedTopologies) {
+            // TODO implement error handling/backoff for non-named topologies (needs KIP)
+            return true;
+        } else {
+            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+        }
+    }
+
+    public void registerTaskError(final Task task, final Throwable t, final long now) {
+        if (hasNamedTopologies) {
+            final String topologyName = task.id().topologyName();
+            topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
+                .registerTaskError(task, t, now);
+        }
+    }
+
+    class NamedTopologyMetadata {
+        private final Logger log;
+        private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<>();
+
+        public NamedTopologyMetadata(final String topologyName) {
+            final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
+            this.log = logContext.logger(NamedTopologyMetadata.class);
+        }
+
+        public boolean canProcess() {
+            // TODO: during long task backoffs, pause the full topology to avoid it getting out of sync
+            return true;
+        }
+
+        public boolean canProcessTask(final Task task, final long now) {
+            // TODO: implement exponential backoff, for now we just wait 15s
+            final Long errorTime = tasksToErrorTime.get(task.id());
+            if (errorTime == null) {
+                return true;
+            } else if (now - errorTime > 15000L) {

Review comment:
       Because it was actually taking the thread 10s to come back up (in the integration test where we overrode `session.timeout` to 10s) before we had https://github.com/apache/kafka/pull/11801
   
   Now with that fix it takes under .5s for the thread to be replaced, so there's no particular reason to have it be 15s. I think it makes sense to lower it to maybe 5s for now, and then when we have the true exponential backoff obviously it can start lower and grow from there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r814202232



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private final boolean hasNamedTopologies;
+    // map of topologies experiencing errors/currently under backoff
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+    }
+
+    public boolean canProcessTask(final Task task, final long now) {
+        final String topologyName = task.id().topologyName();
+        if (!hasNamedTopologies) {
+            // TODO implement error handling/backoff for non-named topologies (needs KIP)
+            return true;
+        } else {
+            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+        }
+    }
+
+    public void registerTaskError(final Task task, final Throwable t, final long now) {
+        if (hasNamedTopologies) {
+            final String topologyName = task.id().topologyName();
+            topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
+                .registerTaskError(task, t, now);
+        }
+    }
+
+    class NamedTopologyMetadata {
+        private final Logger log;
+        private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<>();
+
+        public NamedTopologyMetadata(final String topologyName) {
+            final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
+            this.log = logContext.logger(NamedTopologyMetadata.class);
+        }
+
+        public boolean canProcess() {
+            // TODO: during long task backoffs, pause the full topology to avoid it getting out of sync
+            return true;
+        }
+
+        public boolean canProcessTask(final Task task, final long now) {
+            // TODO: implement exponential backoff, for now we just wait 15s
+            final Long errorTime = tasksToErrorTime.get(task.id());
+            if (errorTime == null) {
+                return true;
+            } else if (now - errorTime > 15000L) {

Review comment:
       Curious why the magic number of 15s?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private final boolean hasNamedTopologies;
+    // map of topologies experiencing errors/currently under backoff
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+    }
+
+    public boolean canProcessTask(final Task task, final long now) {
+        final String topologyName = task.id().topologyName();
+        if (!hasNamedTopologies) {
+            // TODO implement error handling/backoff for non-named topologies (needs KIP)
+            return true;
+        } else {
+            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+        }
+    }
+
+    public void registerTaskError(final Task task, final Throwable t, final long now) {
+        if (hasNamedTopologies) {
+            final String topologyName = task.id().topologyName();
+            topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
+                .registerTaskError(task, t, now);
+        }
+    }
+
+    class NamedTopologyMetadata {

Review comment:
       nit: we can declare this as `private class` right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -110,11 +120,11 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
                 "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
             throw e;
         } catch (final StreamsException e) {
-            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);

Review comment:
       Sorry to see log4j still have not figured out the way for both string param and exception in presentation..

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final String testId = safeUniqueTestName(getClass(), testName);
+    private final String appId = "appId_" + testId;
+    private final Properties properties = props();
+
+    // Task 0
+    private final String inputTopic = "input" + testId;
+    private final String outputTopic = "output" + testId;
+    // Task 1
+    private final String errorInputTopic = "error-input" + testId;
+    private final String errorOutputTopic = "error-output" + testId;
+
+    @Before
+    public void setup() {
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
+    }
+
+    private Properties props() {
+        return mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000),
+                mkEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000)
+            )
+        );
+    }
+
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {

Review comment:
       A meta question: do we really need an integration test to bring up the full stack for this test coverage? I'd feel a unit test with mock time just on the task executor would be sufficient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r812582675



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.UnknownTopologyException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private Logger log;
+
+    private final boolean hasNamedTopologies;
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+        allTopologyNames.forEach(name -> topologyNameToMetadata.put(name, new NamedTopologyMetadata(name)));
+    }
+
+    public void setLog(final LogContext logContext) {
+        log = logContext.logger(getClass());
+    }
+
+    public boolean canProcessTopology(final String topologyName) {
+        if (!hasNamedTopologies) {

Review comment:
       SG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815265785



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final String testId = safeUniqueTestName(getClass(), testName);
+    private final String appId = "appId_" + testId;
+    private final Properties properties = props();
+
+    // Task 0
+    private final String inputTopic = "input" + testId;
+    private final String outputTopic = "output" + testId;
+    // Task 1
+    private final String errorInputTopic = "error-input" + testId;
+    private final String errorOutputTopic = "error-output" + testId;
+
+    @Before
+    public void setup() {
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
+    }
+
+    private Properties props() {
+        return mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000),
+                mkEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000)
+            )
+        );
+    }
+
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {

Review comment:
       I guess the reason I felt an integration test would be good was that there's some subtlety in how we may `poll` multiple times and also in how the thread replacement interplays with the backoff 
   
   Now that the integration test only takes .5s, WDYT about leaving it as an integration test, but moving it to the NamedTopologyIntergrationTest class since what does take a long time is bringing up the CLUSTER for each integration test? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r813725686



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private final boolean hasNamedTopologies;
+    // map of topologies experiencing errors/currently under backoff
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+    }
+
+    public boolean canProcessTask(final Task task, final long now) {
+        final String topologyName = task.id().topologyName();
+        if (!hasNamedTopologies) {
+            // TODO implement error handling/backoff for non-named topologies (needs KIP)
+            return true;
+        } else {
+            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+        }
+    }
+
+    public void registerTaskError(final Task task, final Throwable t, final long now) {
+        if (hasNamedTopologies) {
+            final String topologyName = task.id().topologyName();
+            topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))

Review comment:
       Changed it so that we only track topologies in the map here if they have an active backoff/task in error, rather than registering and unregistering named topologies and trying to keep this in sync between the TopologyMetadata and the individual StreamThreads' view (which was starting to look pretty ugly)
   
   Instead we just pop the topology's metadata into the map when one of its tasks hits a new error, and clear it if/when all tasks are healthy again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#issuecomment-1050219306


   Test failures are unrelated (filed https://issues.apache.org/jira/browse/KAFKA-13690)
   
   Merging to trunk, but will do an immediate followup PR to:
   
   1. delete the new ErrorHandlingIntegrationTest file and see if I can consolidate the test in there with the existing tests in NamedTopologyIntegrationTest
   2. see if I can also consolidate/cut down on some of the tests in NamedTopologyIntegrationTest (I believe there are some cases that can be compressed into a single test, saving the overhead of starting up the KafkaStreams app)
   3. add unit tests
   4. address nits


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r815372448



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final String testId = safeUniqueTestName(getClass(), testName);
+    private final String appId = "appId_" + testId;
+    private final Properties properties = props();
+
+    // Task 0
+    private final String inputTopic = "input" + testId;
+    private final String outputTopic = "output" + testId;
+    // Task 1
+    private final String errorInputTopic = "error-input" + testId;
+    private final String errorOutputTopic = "error-output" + testId;
+
+    @Before
+    public void setup() {
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
+    }
+
+    private Properties props() {
+        return mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
+                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
+                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
+                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000),
+                mkEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000)
+            )
+        );
+    }
+
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {

Review comment:
       Sounds good to me --- indeed the setup time to bring a CLUSTER is the first concern I had, and the second being vulnerable to system time flakiness than a mock time. Since it seems this test is less exposed for the latter concern moving it into an existing test is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r812260659



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.UnknownTopologyException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private Logger log;
+
+    private final boolean hasNamedTopologies;
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+        allTopologyNames.forEach(name -> topologyNameToMetadata.put(name, new NamedTopologyMetadata(name)));
+    }
+
+    public void setLog(final LogContext logContext) {
+        log = logContext.logger(getClass());
+    }
+
+    public boolean canProcessTopology(final String topologyName) {
+        if (!hasNamedTopologies) {

Review comment:
       Should we check under if that the topologyName should always be `UNNAMED_TOPOLOGY`, or `null`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -63,8 +68,21 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina
     int process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
 
-        for (final Task task : tasks.activeTasks()) {
-            totalProcessed += processTask(task, maxNumRecords, time);
+        for (final Map.Entry<String, Set<StreamTask>> topologyEntry : tasks.activeTasksByTopology().entrySet()) {

Review comment:
       I think it's a bit overkill to first organize all tasks into `activeTasksByTopology` also may have unexpected scheduling bias compared to what we did today as more or less random-roundrobin. What about just checking for each task, if `canProcess(taskName) && canProcessTopology(task.topologyName())`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.UnknownTopologyException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {

Review comment:
       Thanks for bringing this up, I think we can come back and clean this up after we've gained confidence and is ready to extend beyond named topology later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r812573407



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
##########
@@ -63,8 +68,21 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina
     int process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
 
-        for (final Task task : tasks.activeTasks()) {
-            totalProcessed += processTask(task, maxNumRecords, time);
+        for (final Map.Entry<String, Set<StreamTask>> topologyEntry : tasks.activeTasksByTopology().entrySet()) {

Review comment:
       Guess this was a case of premature optimization -- I'll update with your suggestion for this PR and work out a better solution that doesn't skew processing for the later PR where this matters (for some context, I did this in part because in one of the followups we will back off entire named topologies when one task is failing recurringly , to avoid getting out of sync, in which case it seemed wasteful to check each task in the topology if we already know it's not ready to process.
   
   But we can revisit this when we get to that PR 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11787: KAFKA-12738: track processing errors and implement constant-time task backoff

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11787:
URL: https://github.com/apache/kafka/pull/11787


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11787: KAFKA-12738: tracking task errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11787:
URL: https://github.com/apache/kafka/pull/11787#discussion_r811811601



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.UnknownTopologyException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {

Review comment:
       Not sure whether it is or will be cleaner in the long run to have this separate class that now has to keep up with topology additions/removals vs just doing all this bookkeeping inside the TopologyMetadata/InternalTopologyBuilder classes -- but until we can carve out time for a real tech debt cleanup of those classes which are already pretty out of control, I felt it best to pull everything out even if it meant duplicated un/registration of topologies




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org