You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/02/24 20:12:33 UTC

[kafka] branch trunk updated: KAFKA-12738: track processing errors and implement constant-time task backoff (#11787)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cd4a1cb  KAFKA-12738: track processing errors and implement constant-time task backoff (#11787)
cd4a1cb is described below

commit cd4a1cb4101abcc7cdd1d2d0d73662114108f3e5
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Thu Feb 24 12:10:31 2022 -0800

    KAFKA-12738: track processing errors and implement constant-time task backoff (#11787)
    
    Part 1 in the initial series of error handling for named topologies.
    
    *Part 1: Track tasks with errors within a named topology & implement constant-time based task backoff
    Part 2: Implement exponential task backoff to account for recurring errors
    Part 3: Pause/backoff all tasks within a named topology in case of a long backoff/frequent errors for any individual task
    
    Reviewers:  Guozhang Wang <gu...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
 .../streams/errors/UnknownTopologyException.java   |   4 +-
 .../streams/processor/internals/StreamThread.java  |   3 +-
 .../processor/internals/TaskExecutionMetadata.java |  98 +++++++++++++
 .../streams/processor/internals/TaskExecutor.java  |  42 ++++--
 .../streams/processor/internals/TaskManager.java   |   8 +-
 .../processor/internals/TopologyMetadata.java      |  12 +-
 .../integration/EmitOnChangeIntegrationTest.java   |  78 ----------
 ...Test.java => ErrorHandlingIntegrationTest.java} | 162 +++++----------------
 8 files changed, 180 insertions(+), 227 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
index 2124405..d764484 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
@@ -24,11 +24,11 @@ public class UnknownTopologyException extends StreamsException {
     private static final long serialVersionUID = 1L;
 
     public UnknownTopologyException(final String message, final String namedTopology) {
-        super(message + "due to being unable to locate a Topology named " + namedTopology);
+        super(message + " due to being unable to locate a Topology named " + namedTopology);
     }
 
     public UnknownTopologyException(final String message, final Throwable throwable, final String namedTopology) {
-        super(message + "due to being unable to locate a Topology named " + namedTopology, throwable);
+        super(message + " due to being unable to locate a Topology named " + namedTopology, throwable);
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2a53e32..3af07bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -932,7 +932,8 @@ public class StreamThread extends Thread {
                 .ifPresent(t -> taskManager.updateTaskEndMetadata(topicPartition, t.offset()));
         }
 
-        log.debug("Main Consumer poll completed in {} ms and fetched {} records", pollLatency, numRecords);
+        log.debug("Main Consumer poll completed in {} ms and fetched {} records from partitions {}",
+            pollLatency, numRecords, records.partitions());
 
         pollSensor.record(pollLatency, now);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
new file mode 100644
index 0000000..63b53ae
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+
+import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
+
+/**
+ * Multi-threaded class that tracks the status of active tasks being processed. A single instance of this class is
+ * shared between all StreamThreads.
+ */
+public class TaskExecutionMetadata {
+    private final boolean hasNamedTopologies;
+    // map of topologies experiencing errors/currently under backoff
+    private final ConcurrentHashMap<String, NamedTopologyMetadata> topologyNameToErrorMetadata = new ConcurrentHashMap<>();
+
+    public TaskExecutionMetadata(final Set<String> allTopologyNames) {
+        this.hasNamedTopologies = !(allTopologyNames.size() == 1 && allTopologyNames.contains(UNNAMED_TOPOLOGY));
+    }
+
+    public boolean canProcessTask(final Task task, final long now) {
+        final String topologyName = task.id().topologyName();
+        if (!hasNamedTopologies) {
+            // TODO implement error handling/backoff for non-named topologies (needs KIP)
+            return true;
+        } else {
+            final NamedTopologyMetadata metadata = topologyNameToErrorMetadata.get(topologyName);
+            return metadata == null || (metadata.canProcess() && metadata.canProcessTask(task, now));
+        }
+    }
+
+    public void registerTaskError(final Task task, final Throwable t, final long now) {
+        if (hasNamedTopologies) {
+            final String topologyName = task.id().topologyName();
+            topologyNameToErrorMetadata.computeIfAbsent(topologyName, n -> new NamedTopologyMetadata(topologyName))
+                .registerTaskError(task, t, now);
+        }
+    }
+
+    class NamedTopologyMetadata {
+        private final Logger log;
+        private final Map<TaskId, Long> tasksToErrorTime = new ConcurrentHashMap<>();
+
+        public NamedTopologyMetadata(final String topologyName) {
+            final LogContext logContext = new LogContext(String.format("topology-name [%s] ", topologyName));
+            this.log = logContext.logger(NamedTopologyMetadata.class);
+        }
+
+        public boolean canProcess() {
+            // TODO: during long task backoffs, pause the full topology to avoid it getting out of sync
+            return true;
+        }
+
+        public boolean canProcessTask(final Task task, final long now) {
+            // TODO: implement exponential backoff, for now we just wait 15s
+            final Long errorTime = tasksToErrorTime.get(task.id());
+            if (errorTime == null) {
+                return true;
+            } else if (now - errorTime > 15000L) {
+                log.info("End backoff for task {} at t={}", task.id(), now);
+                tasksToErrorTime.remove(task.id());
+                if (tasksToErrorTime.isEmpty()) {
+                    topologyNameToErrorMetadata.remove(task.id().topologyName());
+                }
+                return true;
+            } else {
+                log.debug("Skipping processing for unhealthy task {} at t={}", task.id(), now);
+                return false;
+            }
+        }
+
+        public synchronized void registerTaskError(final Task task, final Throwable t, final long now) {
+            log.info("Begin backoff for unhealthy task {} at t={}", task.id(), now);
+            tasksToErrorTime.put(task.id(), now);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 4edc35b..cad03fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -51,12 +51,15 @@ public class TaskExecutor {
     private final boolean hasNamedTopologies;
     private final ProcessingMode processingMode;
     private final Tasks tasks;
+    private final TaskExecutionMetadata taskExecutionMetadata;
 
     public TaskExecutor(final Tasks tasks,
+                        final TaskExecutionMetadata taskExecutionMetadata,
                         final ProcessingMode processingMode,
                         final boolean hasNamedTopologies,
                         final LogContext logContext) {
         this.tasks = tasks;
+        this.taskExecutionMetadata = taskExecutionMetadata;
         this.processingMode = processingMode;
         this.hasNamedTopologies = hasNamedTopologies;
         this.log = logContext.logger(getClass());
@@ -69,23 +72,28 @@ public class TaskExecutor {
     int process(final int maxNumRecords, final Time time) {
         int totalProcessed = 0;
         Task lastProcessed = null;
-        try {
-            for (final Task task : tasks.activeTasks()) {
-                lastProcessed = task;
-                totalProcessed += processTask(task, maxNumRecords, time);
+
+        for (final Task task : tasks.activeTasks()) {
+            final long now = time.milliseconds();
+            try {
+                if (taskExecutionMetadata.canProcessTask(task, now)) {
+                    lastProcessed = task;
+                    totalProcessed += processTask(task, maxNumRecords, now, time);
+                }
+            } catch (final Throwable t) {
+                taskExecutionMetadata.registerTaskError(task, t, now);
+                tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
+                commitSuccessfullyProcessedTasks();
+                throw t;
             }
-        } catch (final Exception e) {
-            tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
-            commitSuccessfullyProcessedTasks();
-            throw e;
         }
 
         return totalProcessed;
     }
 
-    private long processTask(final Task task, final int maxNumRecords, final Time time) {
+    private long processTask(final Task task, final int maxNumRecords, final long begin, final Time time) {
         int processed = 0;
-        long now = time.milliseconds();
+        long now = begin;
 
         final long then = now;
         try {
@@ -94,12 +102,14 @@ public class TaskExecutor {
                 processed++;
             }
             // TODO: enable regardless of whether using named topologies
-            if (hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
+            if (processed > 0 && hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
+                log.trace("Successfully processed task {}", task.id());
                 tasks.addToSuccessfullyProcessed(task);
             }
         } catch (final TimeoutException timeoutException) {
+            // TODO consolidate TimeoutException retries with general error handling
             task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
-            log.debug(
+            log.error(
                 String.format(
                     "Could not complete processing records for %s due to the following exception; will move to next task and retry later",
                     task.id()),
@@ -110,11 +120,11 @@ public class TaskExecutor {
                 "Will trigger a new rebalance and close all tasks as zombies together.", task.id());
             throw e;
         } catch (final StreamsException e) {
-            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
             e.setTaskId(task.id());
             throw e;
         } catch (final RuntimeException e) {
-            log.error("Failed to process stream task {} due to the following error:", task.id(), e);
+            log.error(String.format("Failed to process stream task %s due to the following error:", task.id()), e);
             throw new StreamsException(e, task.id());
         } finally {
             now = time.milliseconds();
@@ -132,7 +142,7 @@ public class TaskExecutor {
      * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
      */
     int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
-                                                            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
+                                                    final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
         int committed = 0;
         for (final Task task : tasksToCommit) {
             // we need to call commitNeeded first since we need to update committable offsets
@@ -253,7 +263,7 @@ public class TaskExecutor {
         if (!tasks.successfullyProcessed().isEmpty()) {
             log.info("Streams encountered an error when processing tasks." +
                 " Will commit all previously successfully processed tasks {}",
-                tasks.successfullyProcessed().toString());
+                tasks.successfullyProcessed().stream().map(Task::id));
             commitTasksAndMaybeUpdateCommittableOffsets(tasks.successfullyProcessed(), new HashMap<>());
         }
         tasks.clearSuccessfullyProcessed();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 0ff6dc2..9cb2b9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -116,7 +116,13 @@ public class TaskManager {
         this.log = logContext.logger(getClass());
 
         this.tasks = new Tasks(logContext, topologyMetadata,  streamsMetrics, activeTaskCreator, standbyTaskCreator);
-        this.taskExecutor = new TaskExecutor(tasks, processingMode, topologyMetadata.hasNamedTopologies(), logContext);
+        this.taskExecutor = new TaskExecutor(
+            tasks,
+            topologyMetadata.taskExecutionMetadata(),
+            processingMode,
+            topologyMetadata.hasNamedTopologies(),
+            logContext
+        );
     }
 
     void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index 602abd3..dbcacf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -69,6 +69,7 @@ public class TopologyMetadata {
     private final StreamsConfig config;
     private final ProcessingMode processingMode;
     private final TopologyVersion version;
+    private final TaskExecutionMetadata taskExecutionMetadata;
 
     private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability
 
@@ -96,8 +97,8 @@ public class TopologyMetadata {
 
     public TopologyMetadata(final InternalTopologyBuilder builder,
                             final StreamsConfig config) {
-        version = new TopologyVersion();
-        processingMode = StreamsConfigUtils.processingMode(config);
+        this.version = new TopologyVersion();
+        this.processingMode = StreamsConfigUtils.processingMode(config);
         this.config = config;
 
         builders = new ConcurrentSkipListMap<>();
@@ -106,6 +107,7 @@ public class TopologyMetadata {
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
+        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
     }
 
     public TopologyMetadata(final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders,
@@ -115,11 +117,11 @@ public class TopologyMetadata {
         this.config = config;
         this.log = LoggerFactory.getLogger(getClass());
 
-
         this.builders = builders;
         if (builders.isEmpty()) {
             log.info("Created an empty KafkaStreams app with no topology");
         }
+        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet());
     }
 
     // Need to (re)set the log here to pick up the `processId` part of the clientId in the prefix
@@ -160,6 +162,10 @@ public class TopologyMetadata {
         maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(threadName);
     }
 
+    public TaskExecutionMetadata taskExecutionMetadata() {
+        return taskExecutionMetadata;
+    }
+
     public void maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(final String threadName) {
         try {
             lock();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 2d04070..25f0f30 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -30,8 +30,6 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
-import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
@@ -47,14 +45,11 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category(IntegrationTest.class)
 public class EmitOnChangeIntegrationTest {
@@ -177,77 +172,4 @@ public class EmitOnChangeIntegrationTest {
             );
         }
     }
-
-    @Test
-    public void shouldEmitRecordsAfterFailures() throws Exception {
-        final Properties properties  = mkObjectProperties(
-            mkMap(
-                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
-                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
-                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
-                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
-                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
-                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
-                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
-            )
-        );
-
-        try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
-            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
-            
-            final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
-            final AtomicInteger noOutputExpected = new AtomicInteger(0);
-            final AtomicInteger twoOutputExpected = new AtomicInteger(0);
-            builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2);
-            builder.stream(inputTopic)
-                .peek((k, v) -> {
-                    throw new RuntimeException("Kaboom");
-                })
-                .peek((k, v) -> noOutputExpected.incrementAndGet())
-                .to(outputTopic);
-
-            kafkaStreams.addNamedTopology(builder.build());            
-            
-            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic,
-                Arrays.asList(
-                    new KeyValue<>(1, "A")
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                0L);
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic2,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                0L);
-            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerDeserializer.class,
-                    StringDeserializer.class
-                ),
-                outputTopic2,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                )
-            );
-            assertThat(noOutputExpected.get(), equalTo(0));
-            assertThat(twoOutputExpected.get(), equalTo(2));
-        }
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
similarity index 52%
copy from streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
copy to streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
index 2d04070..40d2cf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java
@@ -22,19 +22,21 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
 import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -43,21 +45,16 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category(IntegrationTest.class)
-public class EmitOnChangeIntegrationTest {
+public class ErrorHandlingIntegrationTest {
 
     private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
 
@@ -74,145 +71,58 @@ public class EmitOnChangeIntegrationTest {
     @Rule
     public TestName testName = new TestName();
 
-    private static String inputTopic;
-    private static String inputTopic2;
-    private static String outputTopic;
-    private static String outputTopic2;
-    private static String appId = "";
+    private final String testId = safeUniqueTestName(getClass(), testName);
+    private final String appId = "appId_" + testId;
+    private final Properties properties = props();
+
+    // Task 0
+    private final String inputTopic = "input" + testId;
+    private final String outputTopic = "output" + testId;
+    // Task 1
+    private final String errorInputTopic = "error-input" + testId;
+    private final String errorOutputTopic = "error-output" + testId;
 
     @Before
     public void setup() {
-        final String testId = safeUniqueTestName(getClass(), testName);
-        appId = "appId_" + testId;
-        inputTopic = "input" + testId;
-        inputTopic2 = "input2" + testId;
-        outputTopic = "output" + testId;
-        outputTopic2 = "output2" + testId;
-        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2);
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, errorInputTopic, errorOutputTopic, inputTopic, outputTopic);
     }
 
-    @Test
-    public void shouldEmitSameRecordAfterFailover() throws Exception {
-        final Properties properties  = mkObjectProperties(
+    private Properties props() {
+        return mkObjectProperties(
             mkMap(
                 mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
                 mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
-                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath()),
                 mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
-                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
+                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L),
                 mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
                 mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
-                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
-            )
+                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000))
         );
-
-        final AtomicBoolean shouldThrow = new AtomicBoolean(true);
-        final StreamsBuilder builder = new StreamsBuilder();
-        builder.table(inputTopic, Materialized.as("test-store"))
-            .toStream()
-            .map((key, value) -> {
-                if (shouldThrow.compareAndSet(true, false)) {
-                    throw new RuntimeException("Kaboom");
-                } else {
-                    return new KeyValue<>(key, value);
-                }
-            })
-            .to(outputTopic);
-        builder.stream(inputTopic2).to(outputTopic2);
-
-        try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
-            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
-            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                0L);
-
-            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic2,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                ),
-                TestUtils.producerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerSerializer.class,
-                    StringSerializer.class,
-                    new Properties()),
-                0L);
-
-            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerDeserializer.class,
-                    StringDeserializer.class
-                ),
-                outputTopic,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                )
-            );
-            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
-                TestUtils.consumerConfig(
-                    CLUSTER.bootstrapServers(),
-                    IntegerDeserializer.class,
-                    StringDeserializer.class
-                ),
-                outputTopic2,
-                Arrays.asList(
-                    new KeyValue<>(1, "A"),
-                    new KeyValue<>(1, "B")
-                )
-            );
-        }
     }
 
     @Test
-    public void shouldEmitRecordsAfterFailures() throws Exception {
-        final Properties properties  = mkObjectProperties(
-            mkMap(
-                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
-                mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
-                mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
-                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
-                mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
-                mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
-                mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
-            )
-        );
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
+        final AtomicInteger noOutputExpected = new AtomicInteger(0);
+        final AtomicInteger outputExpected = new AtomicInteger(0);
 
         try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
             kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
-            
+
             final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
-            final AtomicInteger noOutputExpected = new AtomicInteger(0);
-            final AtomicInteger twoOutputExpected = new AtomicInteger(0);
-            builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2);
-            builder.stream(inputTopic)
+            builder.stream(inputTopic).peek((k, v) -> outputExpected.incrementAndGet()).to(outputTopic);
+            builder.stream(errorInputTopic)
                 .peek((k, v) -> {
                     throw new RuntimeException("Kaboom");
                 })
                 .peek((k, v) -> noOutputExpected.incrementAndGet())
-                .to(outputTopic);
+                .to(errorOutputTopic);
+
+            kafkaStreams.addNamedTopology(builder.build());
 
-            kafkaStreams.addNamedTopology(builder.build());            
-            
             StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
             IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic,
+                errorInputTopic,
                 Arrays.asList(
                     new KeyValue<>(1, "A")
                 ),
@@ -223,7 +133,7 @@ public class EmitOnChangeIntegrationTest {
                     new Properties()),
                 0L);
             IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                inputTopic2,
+                inputTopic,
                 Arrays.asList(
                     new KeyValue<>(1, "A"),
                     new KeyValue<>(1, "B")
@@ -240,14 +150,14 @@ public class EmitOnChangeIntegrationTest {
                     IntegerDeserializer.class,
                     StringDeserializer.class
                 ),
-                outputTopic2,
+                outputTopic,
                 Arrays.asList(
                     new KeyValue<>(1, "A"),
                     new KeyValue<>(1, "B")
                 )
             );
             assertThat(noOutputExpected.get(), equalTo(0));
-            assertThat(twoOutputExpected.get(), equalTo(2));
+            assertThat(outputExpected.get(), equalTo(2));
         }
     }
 }