You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/09/07 08:25:45 UTC

[kafka] branch trunk updated: KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12492)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdfe4f98c4 KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12492)
cdfe4f98c4 is described below

commit cdfe4f98c4816cdc21f6a0c0d789dc530830427f
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Wed Sep 7 09:25:31 2022 +0100

    KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12492)
    
    Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
    
    Reviewers: Dalibor Plavcic, Matthew de Detrich <md...@gmail.com>, Bruno Cadonna <ca...@apache.org>
---
 .../internals/KStreamFlatTransformTest.java        | 62 +++++++++----------
 .../internals/KStreamFlatTransformValuesTest.java  | 60 +++++++++---------
 .../kstream/internals/KStreamPrintTest.java        | 10 ++-
 .../kstream/internals/KStreamRepartitionTest.java  | 28 ++++-----
 .../internals/MaterializedInternalTest.java        | 22 +++----
 .../internals/TransformerSupplierAdapterTest.java  | 67 +++++++++-----------
 .../KTableSuppressProcessorMetricsTest.java        |  8 ++-
 .../processor/internals/ClientUtilsTest.java       | 72 +++++++++++-----------
 ...ghAvailabilityStreamsPartitionAssignorTest.java | 45 +++++++-------
 9 files changed, 179 insertions(+), 195 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 68fef1eee9..2a3042e2fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -23,23 +23,35 @@ import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFl
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Arrays;
 import java.util.Collections;
 
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class KStreamFlatTransformTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KStreamFlatTransformTest {
 
     private Number inputKey;
     private Number inputValue;
 
+    @Mock
     private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
+    @Mock
     private InternalProcessorContext<Integer, Integer> context;
+    private InOrder inOrder;
 
     private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
 
@@ -47,19 +59,15 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
     public void setUp() {
         inputKey = 1;
         inputValue = 10;
-        transformer = mock(Transformer.class);
-        context = strictMock(InternalProcessorContext.class);
+        inOrder = inOrder(context);
         processor = new KStreamFlatTransformProcessor<>(transformer);
     }
 
     @Test
     public void shouldInitialiseFlatTransformProcessor() {
-        transformer.init(context);
-        replayAll();
-
         processor.init(context);
 
-        verifyAll();
+        verify(transformer).init(context);
     }
 
     @Test
@@ -68,71 +76,59 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
                 KeyValue.pair(2, 20),
                 KeyValue.pair(3, 30),
                 KeyValue.pair(4, 40));
+
         processor.init(context);
-        EasyMock.reset(transformer);
 
-        EasyMock.expect(transformer.transform(inputKey, inputValue)).andReturn(outputRecords);
-        for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
-            context.forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
-        }
-        replayAll();
+        when(transformer.transform(inputKey, inputValue)).thenReturn(outputRecords);
 
         processor.process(new Record<>(inputKey, inputValue, 0L));
 
-        verifyAll();
+        for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
+            inOrder.verify(context).forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
+        }
     }
 
     @Test
     public void shouldAllowEmptyListAsResultOfTransform() {
         processor.init(context);
-        EasyMock.reset(transformer);
 
-        EasyMock.expect(transformer.transform(inputKey, inputValue))
-            .andReturn(Collections.emptyList());
-        replayAll();
+        when(transformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
 
         processor.process(new Record<>(inputKey, inputValue, 0L));
 
-        verifyAll();
+        inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, Integer>>any());
     }
 
     @Test
     public void shouldAllowNullAsResultOfTransform() {
         processor.init(context);
-        EasyMock.reset(transformer);
 
-        EasyMock.expect(transformer.transform(inputKey, inputValue))
-            .andReturn(null);
-        replayAll();
+        when(transformer.transform(inputKey, inputValue)).thenReturn(null);
 
         processor.process(new Record<>(inputKey, inputValue, 0L));
 
-        verifyAll();
+        inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, Integer>>any());
     }
 
     @Test
     public void shouldCloseFlatTransformProcessor() {
-        transformer.close();
-        replayAll();
-
         processor.close();
 
-        verifyAll();
+        verify(transformer).close();
     }
 
     @Test
     public void shouldGetFlatTransformProcessor() {
+        @SuppressWarnings("unchecked")
         final TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformerSupplier =
             mock(TransformerSupplier.class);
         final KStreamFlatTransform<Number, Number, Integer, Integer> processorSupplier =
             new KStreamFlatTransform<>(transformerSupplier);
 
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
-        replayAll();
+        when(transformerSupplier.get()).thenReturn(transformer);
 
         final Processor<Number, Number, Integer, Integer> processor = processorSupplier.get();
 
-        verifyAll();
         assertTrue(processor instanceof KStreamFlatTransformProcessor);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index fcb75e78c0..988eab92bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -17,6 +17,11 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -28,18 +33,25 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
-public class KStreamFlatTransformValuesTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KStreamFlatTransformValuesTest {
 
     private Integer inputKey;
     private Integer inputValue;
 
+    @Mock
     private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
+    @Mock
     private InternalProcessorContext<Integer, String> context;
+    private InOrder inOrder;
 
     private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
 
@@ -47,19 +59,15 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
     public void setUp() {
         inputKey = 1;
         inputValue = 10;
-        valueTransformer = mock(ValueTransformerWithKey.class);
-        context = strictMock(InternalProcessorContext.class);
+        inOrder = inOrder(context);
         processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
     }
 
     @Test
     public void shouldInitializeFlatTransformValuesProcessor() {
-        valueTransformer.init(EasyMock.isA(ForwardingDisabledProcessorContext.class));
-        replayAll();
-
         processor.init(context);
 
-        verifyAll();
+        verify(valueTransformer).init(ArgumentMatchers.isA(ForwardingDisabledProcessorContext.class));
     }
 
     @Test
@@ -68,69 +76,59 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
                 "Hello",
                 "Blue",
                 "Planet");
+
         processor.init(context);
-        EasyMock.reset(valueTransformer);
 
-        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues);
-        for (final String outputValue : outputValues) {
-            context.forward(new Record<>(inputKey, outputValue, 0L));
-        }
-        replayAll();
+        when(valueTransformer.transform(inputKey, inputValue)).thenReturn(outputValues);
 
         processor.process(new Record<>(inputKey, inputValue, 0L));
 
-        verifyAll();
+        for (final String outputValue : outputValues) {
+            inOrder.verify(context).forward(new Record<>(inputKey, outputValue, 0L));
+        }
     }
 
     @Test
     public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
         processor.init(context);
-        EasyMock.reset(valueTransformer);
 
-        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.emptyList());
-        replayAll();
+        when(valueTransformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
 
         processor.process(new Record<>(inputKey, inputValue, 0L));
 
-        verifyAll();
+        inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, String>>any());
     }
 
     @Test
     public void shouldEmitNoRecordIfTransformReturnsNull() {
         processor.init(context);
-        EasyMock.reset(valueTransformer);
 
-        EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null);
-        replayAll();
+        when(valueTransformer.transform(inputKey, inputValue)).thenReturn(null);
 
         processor.process(new Record<>(inputKey, inputValue, 0L));
 
-        verifyAll();
+        inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, String>>any());
     }
 
     @Test
     public void shouldCloseFlatTransformValuesProcessor() {
-        valueTransformer.close();
-        replayAll();
-
         processor.close();
 
-        verifyAll();
+        verify(valueTransformer).close();
     }
 
     @Test
     public void shouldGetFlatTransformValuesProcessor() {
+        @SuppressWarnings("unchecked")
         final ValueTransformerWithKeySupplier<Integer, Integer, Iterable<String>> valueTransformerSupplier =
             mock(ValueTransformerWithKeySupplier.class);
         final KStreamFlatTransformValues<Integer, Integer, String> processorSupplier =
             new KStreamFlatTransformValues<>(valueTransformerSupplier);
 
-        EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
-        replayAll();
+        when(valueTransformerSupplier.get()).thenReturn(valueTransformer);
 
         final Processor<Integer, Integer, Integer, String> processor = processorSupplier.get();
 
-        verifyAll();
         assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 2915a11879..591b81a32b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -20,9 +20,11 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
@@ -31,11 +33,15 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class KStreamPrintTest {
 
     private ByteArrayOutputStream byteOutStream;
     private Processor<Integer, String, Void, Void> printProcessor;
 
+    @Mock
+    private ProcessorContext<Void, Void> processorContext;
+
     @Before
     public void setUp() {
         byteOutStream = new ByteArrayOutputStream();
@@ -46,8 +52,6 @@ public class KStreamPrintTest {
             "test-stream"));
 
         printProcessor = kStreamPrint.get();
-        final ProcessorContext<Void, Void> processorContext = EasyMock.createNiceMock(ProcessorContext.class);
-        EasyMock.replay(processorContext);
 
         printProcessor.init(processorContext);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
index 0344f46db8..c7669a978a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
@@ -35,11 +35,10 @@ import org.apache.kafka.streams.kstream.Repartitioned;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -47,20 +46,20 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
 
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @SuppressWarnings("deprecation")
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class KStreamRepartitionTest {
     private final String inputTopic = "input-topic";
 
@@ -76,11 +75,11 @@ public class KStreamRepartitionTest {
     @Test
     public void shouldInvokePartitionerWhenSet() {
         final int[] expectedKeys = new int[]{0, 1};
-        final StreamPartitioner<Integer, String> streamPartitionerMock = EasyMock.mock(StreamPartitioner.class);
+        @SuppressWarnings("unchecked")
+        final StreamPartitioner<Integer, String> streamPartitionerMock = mock(StreamPartitioner.class);
 
-        expect(streamPartitionerMock.partition(anyString(), eq(0), eq("X0"), anyInt())).andReturn(1).times(1);
-        expect(streamPartitionerMock.partition(anyString(), eq(1), eq("X1"), anyInt())).andReturn(1).times(1);
-        replay(streamPartitionerMock);
+        when(streamPartitionerMock.partition(anyString(), eq(0), eq("X0"), anyInt())).thenReturn(1);
+        when(streamPartitionerMock.partition(anyString(), eq(1), eq("X1"), anyInt())).thenReturn(1);
 
         final String repartitionOperationName = "test";
         final Repartitioned<Integer, String> repartitioned = Repartitioned
@@ -112,7 +111,8 @@ public class KStreamRepartitionTest {
             assertTrue(testOutputTopic.readRecordsToList().isEmpty());
         }
 
-        verify(streamPartitionerMock);
+        verify(streamPartitionerMock).partition(anyString(), eq(0), eq("X0"), anyInt());
+        verify(streamPartitionerMock).partition(anyString(), eq(1), eq("X1"), anyInt());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 302845a973..b805dc273a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -26,40 +26,35 @@ import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.when;
 
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class MaterializedInternalTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalNameProvider nameProvider;
-
-    @Mock(type = MockType.NICE)
+    @Mock
     private KeyValueBytesStoreSupplier supplier;
     private final String prefix = "prefix";
 
     @Test
     public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() {
         final String generatedName = prefix + "-store";
-        EasyMock.expect(nameProvider.newStoreName(prefix)).andReturn(generatedName);
-
-        EasyMock.replay(nameProvider);
+        when(nameProvider.newStoreName(prefix)).thenReturn(generatedName);
 
         final MaterializedInternal<Object, Object, StateStore> materialized =
             new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix);
 
         assertThat(materialized.storeName(), equalTo(generatedName));
-        EasyMock.verify(nameProvider);
     }
 
     @Test
@@ -73,8 +68,7 @@ public class MaterializedInternalTest {
     @Test
     public void shouldUseStoreNameOfSupplierWhenProvided() {
         final String storeName = "other-store-name";
-        EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
-        EasyMock.replay(supplier);
+        when(supplier.name()).thenReturn(storeName);
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
             new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
index 1eb55d0a08..9dc436bd60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
@@ -24,85 +24,77 @@ import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
-import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.core.IsSame.sameInstance;
 import static org.hamcrest.core.IsNot.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class TransformerSupplierAdapterTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class TransformerSupplierAdapterTest {
 
+    @Mock
     private ProcessorContext context;
+    @Mock
     private Transformer<String, String, KeyValue<Integer, Integer>> transformer;
+    @Mock
     private TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier;
+    @Mock
     private Set<StoreBuilder<?>> stores;
 
     final String key = "Hello";
     final String value = "World";
 
-    @Before
-    public void before() {
-        context = mock(ProcessorContext.class);
-        transformer = mock(Transformer.class);
-        transformerSupplier = mock(TransformerSupplier.class);
-        stores = mock(Set.class);
-    }
-
     @Test
     public void shouldCallInitOfAdapteeTransformer() {
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
-        transformer.init(context);
-        replayAll();
+        when(transformerSupplier.get()).thenReturn(transformer);
 
         final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
             new TransformerSupplierAdapter<>(transformerSupplier);
         final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
         adaptedTransformer.init(context);
 
-        verifyAll();
+        verify(transformer).init(context);
     }
 
     @Test
     public void shouldCallCloseOfAdapteeTransformer() {
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
-        transformer.close();
-        replayAll();
+        when(transformerSupplier.get()).thenReturn(transformer);
 
         final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
             new TransformerSupplierAdapter<>(transformerSupplier);
         final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
         adaptedTransformer.close();
 
-        verifyAll();
+        verify(transformer).close();
     }
 
     @Test
     public void shouldCallStoresOfAdapteeTransformerSupplier() {
-        EasyMock.expect(transformerSupplier.stores()).andReturn(stores);
-        replayAll();
+        when(transformerSupplier.stores()).thenReturn(stores);
 
         final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
             new TransformerSupplierAdapter<>(transformerSupplier);
         adapter.stores();
-        verifyAll();
     }
 
     @Test
     public void shouldCallTransformOfAdapteeTransformerAndReturnSingletonIterable() {
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
-        EasyMock.expect(transformer.transform(key, value)).andReturn(KeyValue.pair(0, 1));
-        replayAll();
+        when(transformerSupplier.get()).thenReturn(transformer);
+        when(transformer.transform(key, value)).thenReturn(KeyValue.pair(0, 1));
 
         final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
             new TransformerSupplierAdapter<>(transformerSupplier);
         final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
         final Iterator<KeyValue<Integer, Integer>> iterator = adaptedTransformer.transform(key, value).iterator();
 
-        verifyAll();
         assertThat(iterator.hasNext(), equalTo(true));
         iterator.next();
         assertThat(iterator.hasNext(), equalTo(false));
@@ -110,31 +102,26 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
 
     @Test
     public void shouldCallTransformOfAdapteeTransformerAndReturnEmptyIterable() {
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
-        EasyMock.expect(transformer.transform(key, value)).andReturn(null);
-        replayAll();
+        when(transformerSupplier.get()).thenReturn(transformer);
+        when(transformer.transform(key, value)).thenReturn(null);
 
         final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
             new TransformerSupplierAdapter<>(transformerSupplier);
         final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
         final Iterator<KeyValue<Integer, Integer>> iterator = adaptedTransformer.transform(key, value).iterator();
 
-        verifyAll();
         assertThat(iterator.hasNext(), equalTo(false));
     }
 
     @Test
     public void shouldAlwaysGetNewAdapterTransformer() {
+        @SuppressWarnings("unchecked")
         final Transformer<String, String, KeyValue<Integer, Integer>> transformer1 = mock(Transformer.class);
+        @SuppressWarnings("unchecked")
         final Transformer<String, String, KeyValue<Integer, Integer>> transformer2 = mock(Transformer.class);
+        @SuppressWarnings("unchecked")
         final Transformer<String, String, KeyValue<Integer, Integer>> transformer3 = mock(Transformer.class);
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer1);
-        transformer1.init(context);
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer2);
-        transformer2.init(context);
-        EasyMock.expect(transformerSupplier.get()).andReturn(transformer3);
-        transformer3.init(context);
-        replayAll();
+        when(transformerSupplier.get()).thenReturn(transformer1).thenReturn(transformer2).thenReturn(transformer3);
 
         final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
             new TransformerSupplierAdapter<>(transformerSupplier);
@@ -145,10 +132,12 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
         final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer3 = adapter.get();
         adapterTransformer3.init(context);
 
-        verifyAll();
         assertThat(adapterTransformer1, not(sameInstance(adapterTransformer2)));
         assertThat(adapterTransformer2, not(sameInstance(adapterTransformer3)));
         assertThat(adapterTransformer3, not(sameInstance(adapterTransformer1)));
+        verify(transformer1).init(context);
+        verify(transformer2).init(context);
+        verify(transformer3).init(context);
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 009a70d2a2..953482f2a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -35,9 +35,10 @@ import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffe
 import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
 import org.hamcrest.Matcher;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.time.Duration;
 import java.util.Map;
@@ -49,7 +50,9 @@ import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecord
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class KTableSuppressProcessorMetricsTest {
     private static final long ARBITRARY_LONG = 5L;
     private static final TaskId TASK_ID = new TaskId(0, 0);
@@ -133,7 +136,8 @@ public class KTableSuppressProcessorMetricsTest {
             .withLoggingDisabled()
             .build();
 
-        final KTableImpl<String, ?, Long> mock = EasyMock.mock(KTableImpl.class);
+        @SuppressWarnings("unchecked")
+        final KTableImpl<String, ?, Long> mock = mock(KTableImpl.class);
         final Processor<String, Change<Long>, String, Change<Long>> processor =
             new KTableSuppressProcessorSupplier<>(
                 (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
index d715a3f975..4ce003f695 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
@@ -36,8 +36,9 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.easymock.EasyMock;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptySet;
@@ -47,14 +48,15 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchComm
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ClientUtilsTest {
 
     // consumer and producer records use utf8 encoding for topic name, header keys, etc
@@ -108,75 +110,73 @@ public class ClientUtilsTest {
 
     @Test
     public void fetchCommittedOffsetsShouldRethrowKafkaExceptionAsStreamsException() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
-        expect(consumer.committed(PARTITIONS)).andThrow(new KafkaException());
-        replay(consumer);
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        when(consumer.committed(PARTITIONS)).thenThrow(new KafkaException());
         assertThrows(StreamsException.class, () -> fetchCommittedOffsets(PARTITIONS, consumer));
     }
 
     @Test
     public void fetchCommittedOffsetsShouldRethrowTimeoutException() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
-        expect(consumer.committed(PARTITIONS)).andThrow(new TimeoutException());
-        replay(consumer);
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        when(consumer.committed(PARTITIONS)).thenThrow(new TimeoutException());
         assertThrows(TimeoutException.class, () -> fetchCommittedOffsets(PARTITIONS, consumer));
     }
 
     @Test
     public void fetchCommittedOffsetsShouldReturnEmptyMapIfPartitionsAreEmpty() {
-        final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
         assertTrue(fetchCommittedOffsets(emptySet(), consumer).isEmpty());
     }
 
     @Test
     public void fetchEndOffsetsShouldReturnEmptyMapIfPartitionsAreEmpty() {
-        final Admin adminClient = EasyMock.createMock(AdminClient.class);
+        final Admin adminClient = mock(AdminClient.class);
         assertTrue(fetchEndOffsets(emptySet(), adminClient).isEmpty());
     }
 
     @Test
     public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() throws Exception {
-        final Admin adminClient = EasyMock.createMock(AdminClient.class);
-        final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
-        final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
+        final Admin adminClient = mock(AdminClient.class);
+        final ListOffsetsResult result = mock(ListOffsetsResult.class);
+        @SuppressWarnings("unchecked")
+        final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = mock(KafkaFuture.class);
 
-        EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
-        EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new RuntimeException());
-        replay(adminClient, result, allFuture);
+        when(adminClient.listOffsets(any())).thenReturn(result);
+        when(result.all()).thenReturn(allFuture);
+        when(allFuture.get()).thenThrow(new RuntimeException());
 
         assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
-        verify(adminClient);
     }
 
     @Test
     public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws Exception {
-        final Admin adminClient = EasyMock.createMock(AdminClient.class);
-        final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
-        final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
+        final Admin adminClient = mock(AdminClient.class);
+        final ListOffsetsResult result = mock(ListOffsetsResult.class);
+        @SuppressWarnings("unchecked")
+        final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = mock(KafkaFuture.class);
 
-        EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
-        EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
-        replay(adminClient, result, allFuture);
+        when(adminClient.listOffsets(any())).thenReturn(result);
+        when(result.all()).thenReturn(allFuture);
+        when(allFuture.get()).thenThrow(new InterruptedException());
 
         assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
-        verify(adminClient);
     }
 
     @Test
     public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws Exception {
-        final Admin adminClient = EasyMock.createMock(AdminClient.class);
-        final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
-        final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
+        final Admin adminClient = mock(AdminClient.class);
+        final ListOffsetsResult result = mock(ListOffsetsResult.class);
+        @SuppressWarnings("unchecked")
+        final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = mock(KafkaFuture.class);
 
-        EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
-        EasyMock.expect(result.all()).andStubReturn(allFuture);
-        EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException()));
-        replay(adminClient, result, allFuture);
+        when(adminClient.listOffsets(any())).thenReturn(result);
+        when(result.all()).thenReturn(allFuture);
+        when(allFuture.get()).thenThrow(new ExecutionException(new RuntimeException()));
 
         assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
-        verify(adminClient);
     }
     
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index e67793d676..331f0aebcc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -42,9 +41,11 @@ import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockKeyValueStoreBuilder;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -69,14 +70,16 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class HighAvailabilityStreamsPartitionAssignorTest {
 
     private final List<PartitionInfo> infos = asList(
@@ -104,12 +107,17 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
     private static final String USER_END_POINT = "localhost:8080";
     private static final String APPLICATION_ID = "stream-partition-assignor-test";
 
+    @Mock
     private TaskManager taskManager;
+    @Mock
     private Admin adminClient;
     private StreamsConfig streamsConfig = new StreamsConfig(configProps());
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private TopologyMetadata topologyMetadata = new TopologyMetadata(builder, streamsConfig);
-    private final StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
+    @Mock
+    private StreamsMetadataState streamsMetadataState;
+    @Mock
+    private Consumer<byte[], byte[]> consumer;
     private final Map<String, Subscription> subscriptions = new HashMap<>();
 
     private ReferenceContainer referenceContainer;
@@ -120,7 +128,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
         configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
         configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
         referenceContainer = new ReferenceContainer();
-        referenceContainer.mainConsumer = EasyMock.mock(Consumer.class);
+        referenceContainer.mainConsumer = consumer;
         referenceContainer.adminClient = adminClient;
         referenceContainer.taskManager = taskManager;
         referenceContainer.streamsMetadataState = streamsMetadataState;
@@ -137,7 +145,6 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
         streamsConfig = new StreamsConfig(configMap);
         topologyMetadata = new TopologyMetadata(builder, streamsConfig);
         partitionAssignor.configure(configMap);
-        EasyMock.replay(taskManager, adminClient);
 
         overwriteInternalTopicManagerWithMock();
     }
@@ -148,34 +155,27 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
     }
 
     private void createMockTaskManager(final Map<TaskId, Long> taskOffsetSums) {
-        taskManager = EasyMock.createNiceMock(TaskManager.class);
-        expect(taskManager.topologyMetadata()).andStubReturn(topologyMetadata);
-        expect(taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes();
-        expect(taskManager.processId()).andReturn(UUID_1).anyTimes();
+        when(taskManager.topologyMetadata()).thenReturn(topologyMetadata);
+        when(taskManager.processId()).thenReturn(UUID_1);
         topologyMetadata.buildAndRewriteTopology();
     }
 
     // If you don't care about setting the end offsets for each specific topic partition, the helper method
     // getTopicPartitionOffsetMap is useful for building this input map for all partitions
     private void createMockAdminClient(final Map<TopicPartition, Long> changelogEndOffsets) {
-        adminClient = EasyMock.createMock(AdminClient.class);
-
-        final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
+        final ListOffsetsResult result = mock(ListOffsetsResult.class);
         final KafkaFutureImpl<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = new KafkaFutureImpl<>();
         allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap(
             Entry::getKey,
             t -> {
-                final ListOffsetsResultInfo info = EasyMock.createNiceMock(ListOffsetsResultInfo.class);
-                expect(info.offset()).andStubReturn(t.getValue());
-                EasyMock.replay(info);
+                final ListOffsetsResultInfo info = mock(ListOffsetsResultInfo.class);
+                when(info.offset()).thenReturn(t.getValue());
                 return info;
             }))
         );
 
-        expect(adminClient.listOffsets(anyObject())).andStubReturn(result);
-        expect(result.all()).andReturn(allFuture);
-
-        EasyMock.replay(result);
+        when(adminClient.listOffsets(any())).thenReturn(result);
+        when(result.all()).thenReturn(allFuture);
     }
 
     private void overwriteInternalTopicManagerWithMock() {
@@ -204,8 +204,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
         final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
 
         createMockTaskManager(allTasks);
-        adminClient = EasyMock.createMock(AdminClient.class);
-        expect(adminClient.listOffsets(anyObject())).andThrow(new StreamsException("Should be handled"));
+        when(adminClient.listOffsets(any())).thenThrow(new StreamsException("Should be handled"));
         configurePartitionAssignorWith(singletonMap(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, rebalanceInterval));
 
         final String firstConsumer = "consumer1";