You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/10/13 14:16:12 UTC

[kafka] branch trunk updated: KAFKA-12497: Skip periodic offset commits for failed source tasks (#10528)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18e60cb0001 KAFKA-12497: Skip periodic offset commits for failed source tasks (#10528)
18e60cb0001 is described below

commit 18e60cb0001f72018d9d00221253f69b8761a2e7
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Oct 13 10:15:42 2022 -0400

    KAFKA-12497: Skip periodic offset commits for failed source tasks (#10528)
    
    Also moves the Streams LogCaptureAppender class into the clients module so that it can be used by both Streams and Connect.
    
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Kalpesh Patel <kp...@confluent.io>, John Roesler <vv...@apache.org>, Tom Bentley <tb...@redhat.com>
---
 build.gradle                                       |  1 +
 checkstyle/import-control.xml                      |  5 +--
 .../kafka/common/utils}/LogCaptureAppender.java    |  2 +-
 .../connect/runtime/SourceTaskOffsetCommitter.java |  8 ++++-
 .../kafka/connect/runtime/WorkerSourceTask.java    |  8 +++++
 .../apache/kafka/connect/runtime/WorkerTask.java   |  7 ++++
 .../connect/runtime/WorkerSourceTaskTest.java      | 40 +++++++++++++++++-----
 .../kafka/connect/runtime/rest/RestServerTest.java |  6 ++--
 .../unit/kafka/utils/LogCaptureAppender.scala      |  6 ----
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  2 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |  2 +-
 .../integration/AdjustStreamThreadCountTest.java   |  2 +-
 .../kstream/internals/KGroupedStreamImplTest.java  |  2 +-
 .../kstream/internals/KStreamKStreamJoinTest.java  |  2 +-
 .../kstream/internals/KStreamKTableJoinTest.java   |  2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |  4 +--
 .../KStreamSlidingWindowAggregateTest.java         |  4 +--
 .../internals/KStreamWindowAggregateTest.java      |  2 +-
 .../internals/KTableKTableInnerJoinTest.java       |  2 +-
 .../internals/KTableKTableLeftJoinTest.java        |  2 +-
 .../internals/KTableKTableOuterJoinTest.java       |  2 +-
 .../internals/KTableKTableRightJoinTest.java       |  4 +--
 .../kstream/internals/KTableSourceTest.java        |  4 +--
 .../internals/GlobalStateManagerImplTest.java      |  2 +-
 .../internals/InternalTopicManagerTest.java        |  2 +-
 .../processor/internals/PartitionGroupTest.java    |  2 +-
 .../internals/ProcessorStateManagerTest.java       |  2 +-
 .../processor/internals/RecordCollectorTest.java   |  2 +-
 .../processor/internals/StateDirectoryTest.java    |  2 +-
 .../internals/StoreChangelogReaderTest.java        |  2 +-
 .../processor/internals/StreamThreadTest.java      |  2 +-
 .../processor/internals/TaskManagerTest.java       |  2 +-
 ...ctDualSchemaRocksDBSegmentedBytesStoreTest.java |  2 +-
 .../state/internals/AbstractKeyValueStoreTest.java |  2 +-
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  2 +-
 .../internals/AbstractSessionBytesStoreTest.java   |  2 +-
 .../internals/AbstractWindowBytesStoreTest.java    |  2 +-
 .../internals/CachingInMemorySessionStoreTest.java |  2 +-
 .../CachingPersistentSessionStoreTest.java         |  2 +-
 .../CachingPersistentWindowStoreTest.java          |  2 +-
 ...sToDbOptionsColumnFamilyOptionsAdapterTest.java |  2 +-
 .../internals/RocksDBTimestampedStoreTest.java     |  2 +-
 ...imeOrderedCachingPersistentWindowStoreTest.java |  2 +-
 .../internals/TimeOrderedWindowStoreTest.java      |  2 +-
 44 files changed, 98 insertions(+), 63 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7cf6cc44b1c..daa03038150 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1274,6 +1274,7 @@ project(':clients') {
 
     testImplementation libs.bcpkix
     testImplementation libs.junitJupiter
+    testImplementation libs.log4j
     testImplementation libs.mockitoInline
 
     testRuntimeOnly libs.slf4jlog4j
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b2959d9ae6b..ed0efe10aa1 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -198,6 +198,7 @@
 
     <subpackage name="utils">
       <allow pkg="org.apache.kafka.common" />
+      <allow pkg="org.apache.log4j" />
     </subpackage>
 
     <subpackage name="quotas">
@@ -454,9 +455,6 @@
         <allow pkg="com.fasterxml.jackson" />
         <allow pkg="kafka.utils" />
         <allow pkg="org.apache.zookeeper" />
-        <subpackage name="testutil">
-          <allow pkg="org.apache.log4j" />
-        </subpackage>
       </subpackage>
     </subpackage>
   </subpackage>
@@ -573,7 +571,6 @@
         <allow pkg="com.fasterxml.jackson" />
         <allow pkg="org.apache.http"/>
         <allow pkg="io.swagger.v3.oas.annotations"/>
-        <allow pkg="kafka.utils" />
         <subpackage name="resources">
           <allow pkg="org.apache.log4j" />
         </subpackage>
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
similarity index 98%
rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
rename to clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
index 41d15da8c8a..0d569af30a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals.testutil;
+package org.apache.kafka.common.utils;
 
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index c3416be45c7..0aa170dcef1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -104,7 +104,13 @@ class SourceTaskOffsetCommitter {
         }
     }
 
-    private void commit(WorkerSourceTask workerTask) {
+    // Visible for testing
+    static void commit(WorkerSourceTask workerTask) {
+        if (!workerTask.shouldCommitOffsets()) {
+            log.trace("{} Skipping offset commit as there are no offsets that should be committed", workerTask);
+            return;
+        }
+
         log.debug("{} Committing offsets", workerTask);
         try {
             if (workerTask.commitOffsets()) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 8a8de1b0553..4f7fd741e61 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -194,6 +194,14 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
         commitOffsets();
     }
 
+    /**
+     * @return whether an attempt to commit offsets should be made for the task (i.e., there are pending uncommitted
+     * offsets and the task's producer has not already failed to send a record with a non-retriable error).
+     */
+    public boolean shouldCommitOffsets() {
+        return !isFailed();
+    }
+
     public boolean commitOffsets() {
         long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 0d1749901e3..5c93d52a39a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -63,6 +63,7 @@ abstract class WorkerTask implements Runnable {
     private final CountDownLatch shutdownLatch = new CountDownLatch(1);
     private final TaskMetricsGroup taskMetricsGroup;
     private volatile TargetState targetState;
+    private volatile boolean failed;
     private volatile boolean stopping;   // indicates whether the Worker has asked the task to stop
     private volatile boolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
     private final ErrorHandlingMetrics errorMetrics;
@@ -84,6 +85,7 @@ abstract class WorkerTask implements Runnable {
         this.statusListener = taskMetricsGroup;
         this.loader = loader;
         this.targetState = initialState;
+        this.failed = false;
         this.stopping = false;
         this.cancelled = false;
         this.taskMetricsGroup.recordState(this.targetState);
@@ -163,6 +165,10 @@ abstract class WorkerTask implements Runnable {
 
     protected abstract void close();
 
+    protected boolean isFailed() {
+        return failed;
+    }
+
     protected boolean isStopping() {
         return stopping;
     }
@@ -196,6 +202,7 @@ abstract class WorkerTask implements Runnable {
             statusListener.onStartup(id);
             execute();
         } catch (Throwable t) {
+            failed = true;
             if (cancelled) {
                 log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", this, t);
             } else if (stopping) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index f411efbde3b..f2530681cf9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.ParameterizedTest;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -99,6 +100,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CO
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -372,7 +374,7 @@ public class WorkerSourceTaskTest {
 
         sourceTask.stop();
         EasyMock.expectLastCall();
-        expectOffsetFlush(true);
+        expectEmptyOffsetFlush();
 
         expectClose();
 
@@ -382,8 +384,9 @@ public class WorkerSourceTaskTest {
         Future<?> taskFuture = executor.submit(workerTask);
 
         assertTrue(awaitLatch(pollLatch));
-        //Failure in poll should trigger automatic stop of the worker
+        //Failure in poll should trigger automatic stop of the task
         assertTrue(workerTask.awaitStop(1000));
+        assertShouldSkipCommit();
 
         taskFuture.get();
         assertPollMetrics(0);
@@ -467,6 +470,7 @@ public class WorkerSourceTaskTest {
         workerTask.stop();
         workerStopLatch.countDown();
         assertTrue(workerTask.awaitStop(1000));
+        assertShouldSkipCommit();
 
         taskFuture.get();
         assertPollMetrics(0);
@@ -483,11 +487,11 @@ public class WorkerSourceTaskTest {
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger());
-        expectOffsetFlush(true);
+        expectEmptyOffsetFlush();
 
         sourceTask.stop();
         EasyMock.expectLastCall();
-        expectOffsetFlush(true);
+        expectEmptyOffsetFlush();
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
@@ -528,7 +532,7 @@ public class WorkerSourceTaskTest {
 
         sourceTask.stop();
         EasyMock.expectLastCall();
-        expectOffsetFlush(true);
+        expectEmptyOffsetFlush();
 
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
@@ -647,10 +651,6 @@ public class WorkerSourceTaskTest {
 
     @Test
     public void testSendRecordsProducerSendFailsImmediately() {
-        if (!enableTopicCreation)
-            // should only test with topic creation enabled
-            return;
-
         createWorkerTask();
 
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
@@ -1023,6 +1023,12 @@ public class WorkerSourceTaskTest {
         }
     }
 
+    private void expectEmptyOffsetFlush() throws Exception {
+        EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
+        sourceTask.commit();
+        EasyMock.expectLastCall();
+    }
+
     private void assertPollMetrics(int minimumPollCountExpected) {
         MetricGroup sourceTaskGroup = workerTask.sourceTaskMetricsGroup().metricGroup();
         MetricGroup taskGroup = workerTask.taskMetricsGroup().metricGroup();
@@ -1109,4 +1115,20 @@ public class WorkerSourceTaskTest {
             EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
         }
     }
+
+    private void assertShouldSkipCommit() {
+        assertFalse(workerTask.shouldCommitOffsets());
+
+        LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
+        LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class);
+        try (LogCaptureAppender committerAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class);
+             LogCaptureAppender taskAppender = LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) {
+            SourceTaskOffsetCommitter.commit(workerTask);
+            assertEquals(Collections.emptyList(), taskAppender.getMessages());
+
+            List<String> committerMessages = committerAppender.getMessages();
+            assertEquals(1, committerMessages.size());
+            assertTrue(committerMessages.get(0).contains("Skipping offset commit"));
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 82f9e0395e3..8abedd740e9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.runtime.rest;
 
-import kafka.utils.LogCaptureAppender;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.http.HttpHost;
@@ -31,6 +30,7 @@ import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -362,10 +362,10 @@ public class RestServerTest {
         // Stop the server to flush all logs
         server.stop();
 
-        Collection<String> logMessages = restServerAppender.getRenderedMessages();
+        Collection<String> logMessages = restServerAppender.getMessages();
         LogCaptureAppender.unregister(restServerAppender);
         restServerAppender.close();
-        String expectedlogContent = "\"GET / HTTP/1.1\" " + String.valueOf(response.getStatusLine().getStatusCode());
+        String expectedlogContent = "\"GET / HTTP/1.1\" " + response.getStatusLine().getStatusCode();
         assertTrue(logMessages.stream().anyMatch(logMessage -> logMessage.contains(expectedlogContent)));
     }
 
diff --git a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
index bec77356e1e..2d071452829 100644
--- a/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
+++ b/core/src/test/scala/unit/kafka/utils/LogCaptureAppender.scala
@@ -20,7 +20,6 @@ package kafka.utils
 import org.apache.log4j.{AppenderSkeleton, Level, Logger}
 import org.apache.log4j.spi.LoggingEvent
 
-import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
 
 class LogCaptureAppender extends AppenderSkeleton {
@@ -38,11 +37,6 @@ class LogCaptureAppender extends AppenderSkeleton {
     }
   }
 
-  def getRenderedMessages: java.util.List[String] = {
-    return getMessages.map(e => e.getRenderedMessage).asJava
-  }
-
-
   override def close(): Unit = {
     events.synchronized {
       events.clear()
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b788e54923a..766cd04498e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -58,7 +58,7 @@ import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
 import org.apache.kafka.streams.processor.internals.TopologyMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index c1b9e6a140a..496327635ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -33,7 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index 59683e44600..b203b46b376 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -31,7 +32,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 354fbcac318..8076475eaed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index d4f716df084..6864396d433 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -33,7 +33,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index d162acfa374..dbbe56d4313 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -43,7 +44,6 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index fc993b63a9e..f72f0891185 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -37,8 +37,8 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender.Event;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
index 796afd38845..50b0f8dcdde 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
@@ -39,8 +39,8 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender.Event;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 124bb5593b9..23b250e503c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -47,7 +48,6 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForwa
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 1039cb70ddf..0834727234e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 4af2e3934e1..ee73849e9fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -34,7 +34,7 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index b14973cc934..12e503b3a48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
index eaf74c8660a..9fcffac3755 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -23,8 +23,8 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender.Event;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 70e1bccdbe6..2fc5834b36b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -33,8 +33,8 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender.Event;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 4ba88b73e66..0644f1b705a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.NoOpReadOnlyStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 74cd6cfaa23..9e4964b5647 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -52,7 +52,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 632b0efc23c..599068c0f3c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 9892a13f087..acb7f0da073 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index b3fa516a3f7..c1e8ff8628a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -52,7 +53,6 @@ import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockClientSupplier;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 205f19537ba..c387823fe03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.TestUtils;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index fbc8d423261..1195550fd28 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager.StateStoreMetadata;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.easymock.EasyMock;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 075ecb3ef34..37f32ed4cd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -73,7 +73,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 69688df6f63..b1fa45c58d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -49,7 +49,7 @@ import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory
 import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTasks;
 import org.apache.kafka.streams.processor.internals.Task.State;
 import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 
 import java.nio.file.Files;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 8a44b9ab501..c81e57589a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -45,7 +45,7 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 19b057a8c98..9e0fb306f28 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index b82f544026e..6e1b3bfcf8c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
@@ -44,7 +45,6 @@ import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 78d7f08ee84..efbcd8855a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -34,7 +35,6 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index c69dedc91db..d29c6bf88d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -34,7 +34,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
index d5aa667c0c5..7e0787e6fee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -35,7 +36,6 @@ import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
index 50fd88a2769..54abbd4cc52 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
@@ -34,7 +35,6 @@ import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 72779314e85..27fddc9b075 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -38,7 +39,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
index 4cafecfa2c3..8eec0a40056 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.easymock.EasyMockRunner;
 import org.junit.Test;
 import org.junit.runner.RunWith;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index a1d511ae1ca..58e2f9c3a47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.hamcrest.core.IsNull;
 import org.junit.Test;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index 1ee797d35ec..90b17ded1d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StoreBuilder;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index bf597fb789b..bcc1005eb41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StoreBuilder;