You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/06/24 01:17:39 UTC

[kafka] 02/02: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close (#8900)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d609aef998e058c340ee28afbf785e0051d51794
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Jun 23 18:08:26 2020 -0700

    KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close (#8900)
    
    If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous send. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: KAFKA-10169)
    
    The reason we ended up aborting an un-flushed transaction was due to the combination of
    a. always aborting the ongoing transaction when any task is closed/revoked
    b. only committing (and flushing) if at least one of the revoked tasks needs to be committed (regardless of whether any non-revoked tasks have data/transaction in flight)
    
    Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked.
    
    We can handle this by splitting the RecordCollector's close into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closed
    
    Note that this means we still abort the transaction any time a task is closed dirty, so we must close/reinitialize any active task with pending data (that was aborted).
    
    In sum:
    
    * hackily check the KafkaException message and swallow
    * only abort the transaction during a dirty close
    * refactor shutdown to make sure we don't closeClean a task whose data was actually aborted
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Boyang Chen <bo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/errors/TaskMigratedException.java      |   4 +
 .../processor/internals/RecordCollector.java       |   9 +-
 .../processor/internals/RecordCollectorImpl.java   |  38 ++-
 .../streams/processor/internals/StreamTask.java    |  13 +-
 .../streams/processor/internals/StreamThread.java  |   1 -
 .../processor/internals/StreamsMetadataState.java  |  10 +-
 .../streams/processor/internals/TaskManager.java   | 190 +++++++----
 .../integration/RestoreIntegrationTest.java        |   8 +-
 .../processor/internals/RecordCollectorTest.java   | 352 ++++++++++++---------
 .../processor/internals/StreamTaskTest.java        |   2 +-
 .../processor/internals/TaskManagerTest.java       |   6 -
 .../org/apache/kafka/test/MockRecordCollector.java |   5 +-
 12 files changed, 400 insertions(+), 238 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
index 172a975..fdb3ab8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/TaskMigratedException.java
@@ -26,6 +26,10 @@ public class TaskMigratedException extends StreamsException {
 
     private final static long serialVersionUID = 1L;
 
+    public TaskMigratedException(final String message) {
+        super(message + "; it means all tasks belonging to this thread should be migrated.");
+    }
+
     public TaskMigratedException(final String message, final Throwable throwable) {
         super(message + "; it means all tasks belonging to this thread should be migrated.", throwable);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 0101054..8b22f22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -57,9 +57,14 @@ public interface RecordCollector {
     void flush();
 
     /**
-     * Close the internal {@link Producer}.
+     * Clean close the internal {@link Producer}.
      */
-    void close();
+    void closeClean();
+
+    /**
+     * Dirty close the internal {@link Producer}.
+     */
+    void closeDirty();
 
     /**
      * The last acked offsets from the internal {@link Producer}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 180aae2..dffa3e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
@@ -61,7 +62,7 @@ public class RecordCollectorImpl implements RecordCollector {
     private final boolean eosEnabled;
     private final Map<TopicPartition, Long> offsets;
 
-    private volatile KafkaException sendException;
+    private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
 
     /**
      * @throws StreamsException fatal error that should cause the thread to die (from producer.initTxn)
@@ -170,7 +171,7 @@ public class RecordCollectorImpl implements RecordCollector {
 
         streamsProducer.send(serializedRecord, (metadata, exception) -> {
             // if there's already an exception record, skip logging offsets or new exceptions
-            if (sendException != null) {
+            if (sendException.get() != null) {
                 return;
             }
 
@@ -195,11 +196,11 @@ public class RecordCollectorImpl implements RecordCollector {
 
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
-            sendException = new StreamsException(errorMessage, exception);
+            sendException.set(new StreamsException(errorMessage, exception));
         } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
                 "indicating the task may be migrated out";
-            sendException = new TaskMigratedException(errorMessage, exception);
+            sendException.set(new TaskMigratedException(errorMessage, exception));
         } else {
             if (exception instanceof RetriableException) {
                 errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " +
@@ -210,7 +211,7 @@ public class RecordCollectorImpl implements RecordCollector {
 
             if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
                 errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
-                sendException = new StreamsException(errorMessage, exception);
+                sendException.set(new StreamsException(errorMessage, exception));
             } else {
                 errorMessage += "\nException handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded.";
                 droppedRecordsSensor.record();
@@ -250,10 +251,26 @@ public class RecordCollectorImpl implements RecordCollector {
      * @throws TaskMigratedException recoverable error that would cause the task to be removed
      */
     @Override
-    public void close() {
-        log.info("Closing record collector");
+    public void closeClean() {
+        log.info("Closing record collector clean");
+
+        // No need to abort transaction during a clean close: either we have successfully committed the ongoing
+        // transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
+        // tasks had any data in the current transaction and therefore there is no need to commit or abort it.
+
+        checkForException();
+    }
+
+    /**
+     * @throws StreamsException fatal error that should cause the thread to die
+     * @throws TaskMigratedException recoverable error that would cause the task to be removed
+     */
+    @Override
+    public void closeDirty() {
+        log.info("Closing record collector dirty");
 
         if (eosEnabled) {
+            // We may be closing dirty because the commit failed, so we must abort the transaction to be safe
             streamsProducer.abortTransaction();
         }
 
@@ -266,8 +283,11 @@ public class RecordCollectorImpl implements RecordCollector {
     }
 
     private void checkForException() {
-        if (sendException != null) {
-            throw sendException;
+        final KafkaException exception = sendException.get();
+
+        if (exception != null) {
+            sendException.set(null);
+            throw exception;
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d6c6ea6..6e8bf40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -487,7 +487,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
         switch (state()) {
             case SUSPENDED:
                 stateMgr.recycle();
-                recordCollector.close();
+                recordCollector.closeClean();
 
                 break;
 
@@ -520,9 +520,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
      */
     private void close(final boolean clean) {
         if (clean && commitNeeded) {
+            // It may be that we failed to commit a task during handleRevocation, but "forgot" this and tried to
+            // closeClean in handleAssignment. We should throw if we detect this to force the TaskManager to closeDirty
             log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to"
                           + " commit and should close as dirty instead");
-            throw new StreamsException("Tried to close dirty task as clean");
+            throw new TaskMigratedException("Tried to close dirty task as clean");
         }
         switch (state()) {
             case SUSPENDED:
@@ -542,7 +544,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
                     "state manager close",
                     log);
 
-                TaskManager.executeAndMaybeSwallow(clean, recordCollector::close, "record collector close", log);
+                TaskManager.executeAndMaybeSwallow(
+                    clean,
+                    clean ? recordCollector::closeClean : recordCollector::closeDirty,
+                    "record collector close",
+                    log
+                );
 
                 break;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 09be1f2..838223b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -340,7 +340,6 @@ public class StreamThread extends Thread {
             changelogReader,
             processId,
             logPrefix,
-            streamsMetrics,
             activeTaskCreator,
             standbyTaskCreator,
             builder,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index b554fc6..60d2994 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -152,9 +152,9 @@ public class StreamsMetadataState {
         }
 
         if (globalStores.contains(storeName)) {
-            // global stores are on every node. if we dont' have the host info
+            // global stores are on every node. if we don't have the host info
             // for this host then just pick the first metadata
-            if (thisHost == UNKNOWN_HOST) {
+            if (thisHost.equals(UNKNOWN_HOST)) {
                 return allMetadata.get(0);
             }
             return localMetadata;
@@ -221,9 +221,9 @@ public class StreamsMetadataState {
         }
 
         if (globalStores.contains(storeName)) {
-            // global stores are on every node. if we dont' have the host info
+            // global stores are on every node. if we don't have the host info
             // for this host then just pick the first metadata
-            if (thisHost == UNKNOWN_HOST) {
+            if (thisHost.equals(UNKNOWN_HOST)) {
                 return new KeyQueryMetadata(allMetadata.get(0).hostInfo(), Collections.emptySet(), -1);
             }
             return new KeyQueryMetadata(localMetadata.hostInfo(), Collections.emptySet(), -1);
@@ -265,7 +265,7 @@ public class StreamsMetadataState {
         if (globalStores.contains(storeName)) {
             // global stores are on every node. if we don't have the host info
             // for this host then just pick the first metadata
-            if (thisHost == UNKNOWN_HOST) {
+            if (thisHost.equals(UNKNOWN_HOST)) {
                 return allMetadata.get(0);
             }
             return localMetadata;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7759d7e..025e9f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -35,7 +35,6 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
 
@@ -69,7 +68,6 @@ public class TaskManager {
     private final ChangelogReader changelogReader;
     private final UUID processId;
     private final String logPrefix;
-    private final StreamsMetricsImpl streamsMetrics;
     private final ActiveTaskCreator activeTaskCreator;
     private final StandbyTaskCreator standbyTaskCreator;
     private final InternalTopologyBuilder builder;
@@ -93,7 +91,6 @@ public class TaskManager {
     TaskManager(final ChangelogReader changelogReader,
                 final UUID processId,
                 final String logPrefix,
-                final StreamsMetricsImpl streamsMetrics,
                 final ActiveTaskCreator activeTaskCreator,
                 final StandbyTaskCreator standbyTaskCreator,
                 final InternalTopologyBuilder builder,
@@ -103,7 +100,6 @@ public class TaskManager {
         this.changelogReader = changelogReader;
         this.processId = processId;
         this.logPrefix = logPrefix;
-        this.streamsMetrics = streamsMetrics;
         this.activeTaskCreator = activeTaskCreator;
         this.standbyTaskCreator = standbyTaskCreator;
         this.builder = builder;
@@ -679,92 +675,166 @@ public class TaskManager {
     void shutdown(final boolean clean) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
 
-        final Set<Task> tasksToClose = new HashSet<>();
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        tasksToCloseDirty.addAll(tryCloseCleanAllActiveTasks(clean, firstException));
+        tasksToCloseDirty.addAll(tryCloseCleanAllStandbyTasks(clean, firstException));
+
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task);
+        }
+
+        for (final Task task : activeTaskIterable()) {
+            executeAndMaybeSwallow(
+                clean,
+                () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+                e -> firstException.compareAndSet(null, e),
+                e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
+            );
+        }
+
+        executeAndMaybeSwallow(
+            clean,
+            activeTaskCreator::closeThreadProducerIfNeeded,
+            e -> firstException.compareAndSet(null, e),
+            e -> log.warn("Ignoring an exception while closing thread producer.", e)
+        );
+
+        tasks.clear();
+
+
+        // this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
+        // have still been in CREATED at the time of shutdown, since Task#close will not do so
+        executeAndMaybeSwallow(
+            clean,
+            this::releaseLockedUnassignedTaskDirectories,
+            e -> firstException.compareAndSet(null, e),
+            e -> log.warn("Ignoring an exception while unlocking remaining task directories.", e)
+        );
+
+        final RuntimeException fatalException = firstException.get();
+        if (fatalException != null) {
+            throw new RuntimeException("Unexpected exception while closing task", fatalException);
+        }
+    }
+
+    // Returns the set of active tasks that must be closed dirty
+    private Collection<Task> tryCloseCleanAllActiveTasks(final boolean clean,
+                                                         final AtomicReference<RuntimeException> firstException) {
+        if (!clean) {
+            return activeTaskIterable();
+        }
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
         final Set<Task> tasksToCommit = new HashSet<>();
         final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
 
-        for (final Task task : tasks.values()) {
-            if (clean) {
-                try {
-                    task.suspend();
-                    if (task.commitNeeded()) {
-                        tasksToCommit.add(task);
-                        final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
-                        if (task.isActive()) {
-                            consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
-                        }
-                    }
-                    tasksToClose.add(task);
-                } catch (final TaskMigratedException e) {
-                    // just ignore the exception as it doesn't matter during shutdown
-                    closeTaskDirty(task);
-                } catch (final RuntimeException e) {
-                    firstException.compareAndSet(null, e);
-                    closeTaskDirty(task);
+        for (final Task task : activeTaskIterable()) {
+            try {
+                task.suspend();
+                if (task.commitNeeded()) {
+                    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit();
+                    tasksToCommit.add(task);
+                    consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
                 }
-            } else {
-                closeTaskDirty(task);
+                tasksToCloseClean.add(task);
+            } catch (final TaskMigratedException e) {
+                // just ignore the exception as it doesn't matter during shutdown
+                tasksToCloseDirty.add(task);
+            } catch (final RuntimeException e) {
+                firstException.compareAndSet(null, e);
+                tasksToCloseDirty.add(task);
             }
         }
 
-        try {
-            if (clean) {
+        // If any active tasks can't be committed, none of them can be, and all that need a commit must be closed dirty
+        if (!tasksToCloseDirty.isEmpty()) {
+            tasksToCloseClean.removeAll(tasksToCommit);
+            tasksToCloseDirty.addAll(tasksToCommit);
+        } else {
+            try {
                 commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
+
                 for (final Task task : tasksToCommit) {
                     try {
                         task.postCommit();
                     } catch (final RuntimeException e) {
                         log.error("Exception caught while post-committing task " + task.id(), e);
                         firstException.compareAndSet(null, e);
+                        tasksToCloseDirty.add(task);
+                        tasksToCloseClean.remove(task);
                     }
                 }
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while committing tasks during shutdown", e);
+                firstException.compareAndSet(null, e);
+
+                // If the commit fails, everyone who participated in it must be closed dirty
+                tasksToCloseClean.removeAll(tasksToCommit);
+                tasksToCloseDirty.addAll(tasksToCommit);
             }
-        } catch (final RuntimeException e) {
-            log.error("Exception caught while committing tasks during shutdown", e);
-            firstException.compareAndSet(null, e);
         }
 
-        for (final Task task : tasksToClose) {
+        for (final Task task : tasksToCloseClean) {
             try {
                 completeTaskCloseClean(task);
             } catch (final RuntimeException e) {
+                log.error("Exception caught while clean-closing task " + task.id(), e);
                 firstException.compareAndSet(null, e);
-                closeTaskDirty(task);
+                tasksToCloseDirty.add(task);
             }
         }
 
-        for (final Task task : tasks.values()) {
-            if (task.isActive()) {
-                executeAndMaybeSwallow(
-                    clean,
-                    () -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
-                    e -> firstException.compareAndSet(null, e),
-                    e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e)
-                );
-            }
-        }
+        return tasksToCloseDirty;
+    }
 
-        tasks.clear();
+    // Returns the set of standby tasks that must be closed dirty
+    private Collection<Task> tryCloseCleanAllStandbyTasks(final boolean clean,
+                                                          final AtomicReference<RuntimeException> firstException) {
+        if (!clean) {
+            return standbyTaskIterable();
+        }
+        final Set<Task> tasksToCloseDirty = new HashSet<>();
+        final Set<Task> tasksToCloseClean = new HashSet<>();
+        final Set<Task> tasksToCommit = new HashSet<>();
 
-        executeAndMaybeSwallow(
-            clean,
-            activeTaskCreator::closeThreadProducerIfNeeded,
-            e -> firstException.compareAndSet(null, e),
-            e -> log.warn("Ignoring an exception while closing thread producer.", e)
-        );
+        for (final Task task : standbyTaskIterable()) {
+            try {
+                task.suspend();
+                if (task.commitNeeded()) {
+                    task.prepareCommit();
+                    tasksToCommit.add(task);
+                }
+                tasksToCloseClean.add(task);
+            } catch (final TaskMigratedException e) {
+                // just ignore the exception as it doesn't matter during shutdown
+                tasksToCloseDirty.add(task);
+            } catch (final RuntimeException e) {
+                firstException.compareAndSet(null, e);
+                tasksToCloseDirty.add(task);
+            }
+        }
 
-        try {
-            // this should be called after closing all tasks, to make sure we unlock the task dir for tasks that may
-            // have still been in CREATED at the time of shutdown, since Task#close will not do so
-            releaseLockedUnassignedTaskDirectories();
-        } catch (final RuntimeException e) {
-            firstException.compareAndSet(null, e);
+        for (final Task task : tasksToCommit) {
+            try {
+                task.postCommit();
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while post-committing standby task " + task.id(), e);
+                firstException.compareAndSet(null, e);
+                tasksToCloseDirty.add(task);
+                tasksToCloseClean.remove(task);
+            }
         }
 
-        final RuntimeException fatalException = firstException.get();
-        if (fatalException != null) {
-            throw new RuntimeException("Unexpected exception while closing task", fatalException);
+        for (final Task task : tasksToCloseClean) {
+            try {
+                completeTaskCloseClean(task);
+            } catch (final RuntimeException e) {
+                log.error("Exception caught while clean-closing standby task " + task.id(), e);
+                firstException.compareAndSet(null, e);
+                tasksToCloseDirty.add(task);
+            }
         }
+        return tasksToCloseDirty;
     }
 
     Set<TaskId> activeTaskIds() {
@@ -801,6 +871,10 @@ public class TaskManager {
         return standbyTaskStream().collect(Collectors.toMap(Task::id, t -> t));
     }
 
+    private List<Task> standbyTaskIterable() {
+        return standbyTaskStream().collect(Collectors.toList());
+    }
+
     private Stream<Task> standbyTaskStream() {
         return tasks.values().stream().filter(t -> !t.isActive());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index faac172..bada892 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -359,7 +359,9 @@ public class RestoreIntegrationTest {
         waitForStandbyCompletion(client1, 1, 30 * 1000L);
         waitForStandbyCompletion(client2, 1, 30 * 1000L);
 
-        assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(0));
+        // Sometimes the store happens to have already been closed sometime during startup, so just keep track
+        // of where it started and make sure it doesn't happen more times from there
+        final int initialStoreCloseCount = CloseCountingInMemoryStore.numStoresClosed();
         assertThat(restoreListener.totalNumRestored(), CoreMatchers.equalTo(0L));
 
         client2.close();
@@ -374,12 +376,12 @@ public class RestoreIntegrationTest {
 
         // After stopping instance 2 and letting instance 1 take over its tasks, we should have closed just two stores
         // total: the active and standby tasks on instance 2
-        assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(2));
+        assertThat(CloseCountingInMemoryStore.numStoresClosed(), equalTo(initialStoreCloseCount + 2));
 
         client1.close();
         waitForApplicationState(singletonList(client2), State.NOT_RUNNING, Duration.ofSeconds(60));
 
-        assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(4));
+        assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4));
     }
 
     private static KeyValueBytesStoreSupplier getCloseCountingStore(final String name) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 312ba88..1e3d232 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -141,7 +141,7 @@ public class RecordCollectorTest {
 
     @After
     public void cleanup() {
-        collector.close();
+        collector.closeClean();
     }
 
     @Test
@@ -299,7 +299,25 @@ public class RecordCollectorTest {
     }
 
     @Test
-    public void shouldAbortTxIfEosEnabled() {
+    public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
+        final StreamsProducer streamsProducer = mock(StreamsProducer.class);
+        expect(streamsProducer.eosEnabled()).andReturn(true);
+        replay(streamsProducer);
+
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            streamsProducer,
+            productionExceptionHandler,
+            streamsMetrics);
+
+        collector.closeClean();
+
+        verify(streamsProducer);
+    }
+
+    @Test
+    public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
         final StreamsProducer streamsProducer = mock(StreamsProducer.class);
         expect(streamsProducer.eosEnabled()).andReturn(true);
         streamsProducer.abortTransaction();
@@ -312,7 +330,7 @@ public class RecordCollectorTest {
             productionExceptionHandler,
             streamsMetrics);
 
-        collector.close();
+        collector.closeDirty();
 
         verify(streamsProducer);
     }
@@ -430,30 +448,12 @@ public class RecordCollectorTest {
     }
 
     @Test
-    public void shouldThrowTaskMigratedExceptionOnSubsequentCallWhenProducerFencedInCallback() {
+    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedInCallback() {
         final KafkaException exception = new ProducerFencedException("KABOOM!");
         final RecordCollector collector = new RecordCollectorImpl(
             logContext,
             taskId,
-            new StreamsProducer(
-                eosConfig,
-                "threadId",
-                new MockClientSupplier() {
-                    @Override
-                    public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
-                            @Override
-                            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
-                                callback.onCompletion(null, exception);
-                                return null;
-                            }
-                        };
-                    }
-                },
-                taskId,
-                null,
-                logContext
-            ),
+            getExceptionalStreamsProducer(exception),
             productionExceptionHandler,
             streamsMetrics
         );
@@ -461,72 +461,84 @@ public class RecordCollectorTest {
 
         collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
-        TaskMigratedException thrown = assertThrows(
+        final TaskMigratedException thrown = assertThrows(
             TaskMigratedException.class, () ->
-            collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+                collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
         );
         assertEquals(exception, thrown.getCause());
         assertThat(
             thrown.getMessage(),
             equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
-                "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
-                " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
+                        "\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
+                        "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
+                        " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
+    }
+
+    @Test
+    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedInCallback() {
+        final KafkaException exception = new ProducerFencedException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            productionExceptionHandler,
+            streamsMetrics
+        );
+        collector.initialize();
+
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
-        thrown = assertThrows(TaskMigratedException.class, collector::flush);
+        final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::flush);
         assertEquals(exception, thrown.getCause());
         assertThat(
             thrown.getMessage(),
             equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
-                "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
-                " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
+                        "\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
+                        "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
+                        " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
+    }
+
+    @Test
+    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedInCallback() {
+        final KafkaException exception = new ProducerFencedException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            productionExceptionHandler,
+            streamsMetrics
+        );
+        collector.initialize();
 
-        thrown = assertThrows(TaskMigratedException.class, collector::close);
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+
+        final TaskMigratedException thrown = assertThrows(TaskMigratedException.class, collector::closeClean);
         assertEquals(exception, thrown.getCause());
         assertThat(
             thrown.getMessage(),
             equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
-                "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
-                " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
+                        "\norg.apache.kafka.common.errors.ProducerFencedException: KABOOM!" +
+                        "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced," +
+                        " indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.")
         );
     }
 
     @Test
-    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() {
+    public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler() {
         final KafkaException exception = new KafkaException("KABOOM!");
         final RecordCollector collector = new RecordCollectorImpl(
             logContext,
             taskId,
-            new StreamsProducer(
-                config,
-                "threadId",
-                new MockClientSupplier() {
-                    @Override
-                    public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
-                            @Override
-                            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
-                                callback.onCompletion(null, exception);
-                                return null;
-                            }
-                        };
-                    }
-                },
-                null,
-                null,
-                logContext
-            ),
+            getExceptionalStreamsProducer(exception),
             productionExceptionHandler,
             streamsMetrics
         );
 
         collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
 
-        StreamsException thrown = assertThrows(
+        final StreamsException thrown = assertThrows(
             StreamsException.class,
             () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
         );
@@ -537,23 +549,123 @@ public class RecordCollectorTest {
                 "\norg.apache.kafka.common.KafkaException: KABOOM!" +
                 "\nException handler choose to FAIL the processing, no more records would be sent.")
         );
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultExceptionHandler() {
+        final KafkaException exception = new KafkaException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            productionExceptionHandler,
+            streamsMetrics
+        );
 
-        thrown = assertThrows(StreamsException.class, collector::flush);
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+
+        final StreamsException thrown = assertThrows(StreamsException.class, collector::flush);
         assertEquals(exception, thrown.getCause());
         assertThat(
             thrown.getMessage(),
             equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.KafkaException: KABOOM!" +
-                "\nException handler choose to FAIL the processing, no more records would be sent.")
+                        "\norg.apache.kafka.common.KafkaException: KABOOM!" +
+                        "\nException handler choose to FAIL the processing, no more records would be sent.")
         );
+    }
 
-        thrown = assertThrows(StreamsException.class, collector::close);
+    @Test
+    public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultExceptionHandler() {
+        final KafkaException exception = new KafkaException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            productionExceptionHandler,
+            streamsMetrics
+        );
+
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+
+        final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean);
         assertEquals(exception, thrown.getCause());
         assertThat(
             thrown.getMessage(),
             equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.KafkaException: KABOOM!" +
-                "\nException handler choose to FAIL the processing, no more records would be sent.")
+                        "\norg.apache.kafka.common.KafkaException: KABOOM!" +
+                        "\nException handler choose to FAIL the processing, no more records would be sent.")
+        );
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler() {
+        final KafkaException exception = new AuthenticationException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            new AlwaysContinueProductionExceptionHandler(),
+            streamsMetrics
+        );
+
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+
+        final StreamsException thrown = assertThrows(
+            StreamsException.class,
+            () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
+        );
+        assertEquals(exception, thrown.getCause());
+        assertThat(
+            thrown.getMessage(),
+            equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
+                        "\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
+                        "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
+        );
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueExceptionHandler() {
+        final KafkaException exception = new AuthenticationException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            new AlwaysContinueProductionExceptionHandler(),
+            streamsMetrics
+        );
+
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+
+        final StreamsException thrown = assertThrows(StreamsException.class, collector::flush);
+        assertEquals(exception, thrown.getCause());
+        assertThat(
+            thrown.getMessage(),
+            equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
+                        "\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
+                        "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
+        );
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueExceptionHandler() {
+        final KafkaException exception = new AuthenticationException("KABOOM!");
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducer(exception),
+            new AlwaysContinueProductionExceptionHandler(),
+            streamsMetrics
+        );
+
+        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+
+        final StreamsException thrown = assertThrows(StreamsException.class, collector::closeClean);
+        assertEquals(exception, thrown.getCause());
+        assertThat(
+            thrown.getMessage(),
+            equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
+                        "\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
+                        "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
         );
     }
 
@@ -562,25 +674,7 @@ public class RecordCollectorTest {
         final RecordCollector collector = new RecordCollectorImpl(
             logContext,
             taskId,
-            new StreamsProducer(
-                config,
-                "threadId",
-                new MockClientSupplier() {
-                    @Override
-                    public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
-                            @Override
-                            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
-                                callback.onCompletion(null, new Exception());
-                                return null;
-                            }
-                        };
-                    }
-                },
-                null,
-                null,
-                logContext
-            ),
+            getExceptionalStreamsProducer(new Exception()),
             new AlwaysContinueProductionExceptionHandler(),
             streamsMetrics
         );
@@ -616,73 +710,11 @@ public class RecordCollectorTest {
 
         collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
         collector.flush();
-        collector.close();
-    }
-
-    @Test
-    public void shouldThrowStreamsExceptionOnSubsequentCallIfFatalEvenWithContinueExceptionHandler() {
-        final KafkaException exception = new AuthenticationException("KABOOM!");
-        final RecordCollector collector = new RecordCollectorImpl(
-            logContext,
-            taskId,
-            new StreamsProducer(
-                config,
-                "threadId",
-                new MockClientSupplier() {
-                    @Override
-                    public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
-                        return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
-                            @Override
-                            public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
-                                callback.onCompletion(null, exception);
-                                return null;
-                            }
-                        };
-                    }
-                },
-                null,
-                null,
-                logContext
-            ),
-            new AlwaysContinueProductionExceptionHandler(),
-            streamsMetrics
-        );
-
-        collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-
-        StreamsException thrown = assertThrows(
-            StreamsException.class,
-            () -> collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner)
-        );
-        assertEquals(exception, thrown.getCause());
-        assertThat(
-            thrown.getMessage(),
-            equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
-                "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
-        );
-
-        thrown = assertThrows(StreamsException.class, collector::flush);
-        assertEquals(exception, thrown.getCause());
-        assertThat(
-            thrown.getMessage(),
-            equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
-                "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
-        );
-
-        thrown = assertThrows(StreamsException.class, collector::close);
-        assertEquals(exception, thrown.getCause());
-        assertThat(
-            thrown.getMessage(),
-            equalTo("Error encountered sending record to topic topic for task 0_0 due to:" +
-                "\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!" +
-                "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.")
-        );
+        collector.closeClean();
     }
 
     @Test
-    public void shouldNotAbortTxnOnEOSCloseIfNothingSent() {
+    public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
         final RecordCollector collector = new RecordCollectorImpl(
             logContext,
@@ -709,7 +741,7 @@ public class RecordCollectorTest {
             streamsMetrics
         );
 
-        collector.close();
+        collector.closeDirty();
         assertFalse(functionCalled.get());
     }
 
@@ -774,7 +806,7 @@ public class RecordCollectorTest {
             streamsMetrics
         );
 
-        collector.close();
+        collector.closeClean();
 
         // Flush should not throw as producer is still alive.
         streamsProducer.flush();
@@ -782,12 +814,34 @@ public class RecordCollectorTest {
 
     @Test
     public void shouldNotCloseInternalProducerForNonEOS() {
-        collector.close();
+        collector.closeClean();
 
         // Flush should not throw as producer is still alive.
         streamsProducer.flush();
     }
 
+    private StreamsProducer getExceptionalStreamsProducer(final Exception exception) {
+        return new StreamsProducer(
+            config,
+            "threadId",
+            new MockClientSupplier() {
+                @Override
+                public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
+                    return new MockProducer<byte[], byte[]>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                        @Override
+                        public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
+                            callback.onCompletion(null, exception);
+                            return null;
+                        }
+                    };
+                }
+            },
+            null,
+            null,
+            logContext
+        );
+    }
+
     private static class CustomStringSerializer extends StringSerializer {
         private boolean isKey;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 59e96d4..c6ffe74 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1801,7 +1801,7 @@ public class StreamTaskTest {
     @Test
     public void shouldOnlyRecycleSuspendedTasks() {
         stateManager.recycle();
-        recordCollector.close();
+        recordCollector.closeClean();
         EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig(false, "100"), true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index a0f3be5..fcfbb1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -33,17 +33,14 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.easymock.EasyMock;
@@ -166,13 +163,10 @@ public class TaskManagerTest {
     }
 
     private void setUpTaskManager(final StreamThread.ProcessingMode processingMode) {
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
         taskManager = new TaskManager(
             changeLogReader,
             UUID.randomUUID(),
             "taskManagerTest",
-            streamsMetrics,
             activeTaskCreator,
             standbyTaskCreator,
             topologyBuilder,
diff --git a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
index 2a781f0..505ee68 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
@@ -81,7 +81,10 @@ public class MockRecordCollector implements RecordCollector {
     }
 
     @Override
-    public void close() {}
+    public void closeClean() {}
+
+    @Override
+    public void closeDirty() {}
 
     @Override
     public Map<TopicPartition, Long> offsets() {