You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/09/07 08:25:45 UTC
[kafka] branch trunk updated: KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12492)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdfe4f98c4 KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12492)
cdfe4f98c4 is described below
commit cdfe4f98c4816cdc21f6a0c0d789dc530830427f
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Wed Sep 7 09:25:31 2022 +0100
KAFKA-14133: Replace EasyMock with Mockito in streams tests (#12492)
Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito.
Reviewers: Dalibor Plavcic, Matthew de Detrich <md...@gmail.com>, Bruno Cadonna <ca...@apache.org>
---
.../internals/KStreamFlatTransformTest.java | 62 +++++++++----------
.../internals/KStreamFlatTransformValuesTest.java | 60 +++++++++---------
.../kstream/internals/KStreamPrintTest.java | 10 ++-
.../kstream/internals/KStreamRepartitionTest.java | 28 ++++-----
.../internals/MaterializedInternalTest.java | 22 +++----
.../internals/TransformerSupplierAdapterTest.java | 67 +++++++++-----------
.../KTableSuppressProcessorMetricsTest.java | 8 ++-
.../processor/internals/ClientUtilsTest.java | 72 +++++++++++-----------
...ghAvailabilityStreamsPartitionAssignorTest.java | 45 +++++++-------
9 files changed, 179 insertions(+), 195 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index 68fef1eee9..2a3042e2fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -23,23 +23,35 @@ import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFl
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-public class KStreamFlatTransformTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KStreamFlatTransformTest {
private Number inputKey;
private Number inputValue;
+ @Mock
private Transformer<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformer;
+ @Mock
private InternalProcessorContext<Integer, Integer> context;
+ private InOrder inOrder;
private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
@@ -47,19 +59,15 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
public void setUp() {
inputKey = 1;
inputValue = 10;
- transformer = mock(Transformer.class);
- context = strictMock(InternalProcessorContext.class);
+ inOrder = inOrder(context);
processor = new KStreamFlatTransformProcessor<>(transformer);
}
@Test
public void shouldInitialiseFlatTransformProcessor() {
- transformer.init(context);
- replayAll();
-
processor.init(context);
- verifyAll();
+ verify(transformer).init(context);
}
@Test
@@ -68,71 +76,59 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
KeyValue.pair(2, 20),
KeyValue.pair(3, 30),
KeyValue.pair(4, 40));
+
processor.init(context);
- EasyMock.reset(transformer);
- EasyMock.expect(transformer.transform(inputKey, inputValue)).andReturn(outputRecords);
- for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
- context.forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
- }
- replayAll();
+ when(transformer.transform(inputKey, inputValue)).thenReturn(outputRecords);
processor.process(new Record<>(inputKey, inputValue, 0L));
- verifyAll();
+ for (final KeyValue<Integer, Integer> outputRecord : outputRecords) {
+ inOrder.verify(context).forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
+ }
}
@Test
public void shouldAllowEmptyListAsResultOfTransform() {
processor.init(context);
- EasyMock.reset(transformer);
- EasyMock.expect(transformer.transform(inputKey, inputValue))
- .andReturn(Collections.emptyList());
- replayAll();
+ when(transformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
processor.process(new Record<>(inputKey, inputValue, 0L));
- verifyAll();
+ inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, Integer>>any());
}
@Test
public void shouldAllowNullAsResultOfTransform() {
processor.init(context);
- EasyMock.reset(transformer);
- EasyMock.expect(transformer.transform(inputKey, inputValue))
- .andReturn(null);
- replayAll();
+ when(transformer.transform(inputKey, inputValue)).thenReturn(null);
processor.process(new Record<>(inputKey, inputValue, 0L));
- verifyAll();
+ inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, Integer>>any());
}
@Test
public void shouldCloseFlatTransformProcessor() {
- transformer.close();
- replayAll();
-
processor.close();
- verifyAll();
+ verify(transformer).close();
}
@Test
public void shouldGetFlatTransformProcessor() {
+ @SuppressWarnings("unchecked")
final TransformerSupplier<Number, Number, Iterable<KeyValue<Integer, Integer>>> transformerSupplier =
mock(TransformerSupplier.class);
final KStreamFlatTransform<Number, Number, Integer, Integer> processorSupplier =
new KStreamFlatTransform<>(transformerSupplier);
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
- replayAll();
+ when(transformerSupplier.get()).thenReturn(transformer);
final Processor<Number, Number, Integer, Integer> processor = processorSupplier.get();
- verifyAll();
assertTrue(processor instanceof KStreamFlatTransformProcessor);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
index fcb75e78c0..988eab92bf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
@@ -17,6 +17,11 @@
package org.apache.kafka.streams.kstream.internals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
@@ -28,18 +33,25 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
-public class KStreamFlatTransformValuesTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class KStreamFlatTransformValuesTest {
private Integer inputKey;
private Integer inputValue;
+ @Mock
private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
+ @Mock
private InternalProcessorContext<Integer, String> context;
+ private InOrder inOrder;
private KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;
@@ -47,19 +59,15 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
public void setUp() {
inputKey = 1;
inputValue = 10;
- valueTransformer = mock(ValueTransformerWithKey.class);
- context = strictMock(InternalProcessorContext.class);
+ inOrder = inOrder(context);
processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
}
@Test
public void shouldInitializeFlatTransformValuesProcessor() {
- valueTransformer.init(EasyMock.isA(ForwardingDisabledProcessorContext.class));
- replayAll();
-
processor.init(context);
- verifyAll();
+ verify(valueTransformer).init(ArgumentMatchers.isA(ForwardingDisabledProcessorContext.class));
}
@Test
@@ -68,69 +76,59 @@ public class KStreamFlatTransformValuesTest extends EasyMockSupport {
"Hello",
"Blue",
"Planet");
+
processor.init(context);
- EasyMock.reset(valueTransformer);
- EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(outputValues);
- for (final String outputValue : outputValues) {
- context.forward(new Record<>(inputKey, outputValue, 0L));
- }
- replayAll();
+ when(valueTransformer.transform(inputKey, inputValue)).thenReturn(outputValues);
processor.process(new Record<>(inputKey, inputValue, 0L));
- verifyAll();
+ for (final String outputValue : outputValues) {
+ inOrder.verify(context).forward(new Record<>(inputKey, outputValue, 0L));
+ }
}
@Test
public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
processor.init(context);
- EasyMock.reset(valueTransformer);
- EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(Collections.emptyList());
- replayAll();
+ when(valueTransformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
processor.process(new Record<>(inputKey, inputValue, 0L));
- verifyAll();
+ inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, String>>any());
}
@Test
public void shouldEmitNoRecordIfTransformReturnsNull() {
processor.init(context);
- EasyMock.reset(valueTransformer);
- EasyMock.expect(valueTransformer.transform(inputKey, inputValue)).andReturn(null);
- replayAll();
+ when(valueTransformer.transform(inputKey, inputValue)).thenReturn(null);
processor.process(new Record<>(inputKey, inputValue, 0L));
- verifyAll();
+ inOrder.verify(context, never()).forward(ArgumentMatchers.<Record<Integer, String>>any());
}
@Test
public void shouldCloseFlatTransformValuesProcessor() {
- valueTransformer.close();
- replayAll();
-
processor.close();
- verifyAll();
+ verify(valueTransformer).close();
}
@Test
public void shouldGetFlatTransformValuesProcessor() {
+ @SuppressWarnings("unchecked")
final ValueTransformerWithKeySupplier<Integer, Integer, Iterable<String>> valueTransformerSupplier =
mock(ValueTransformerWithKeySupplier.class);
final KStreamFlatTransformValues<Integer, Integer, String> processorSupplier =
new KStreamFlatTransformValues<>(valueTransformerSupplier);
- EasyMock.expect(valueTransformerSupplier.get()).andReturn(valueTransformer);
- replayAll();
+ when(valueTransformerSupplier.get()).thenReturn(valueTransformer);
final Processor<Integer, Integer, Integer, String> processor = processorSupplier.get();
- verifyAll();
assertTrue(processor instanceof KStreamFlatTransformValuesProcessor);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
index 2915a11879..591b81a32b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -20,9 +20,11 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
@@ -31,11 +33,15 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KStreamPrintTest {
private ByteArrayOutputStream byteOutStream;
private Processor<Integer, String, Void, Void> printProcessor;
+ @Mock
+ private ProcessorContext<Void, Void> processorContext;
+
@Before
public void setUp() {
byteOutStream = new ByteArrayOutputStream();
@@ -46,8 +52,6 @@ public class KStreamPrintTest {
"test-stream"));
printProcessor = kStreamPrint.get();
- final ProcessorContext<Void, Void> processorContext = EasyMock.createNiceMock(ProcessorContext.class);
- EasyMock.replay(processorContext);
printProcessor.init(processorContext);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
index 0344f46db8..c7669a978a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.java
@@ -35,11 +35,10 @@ import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.StreamsTestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import java.time.Duration;
import java.time.Instant;
@@ -47,20 +46,20 @@ import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
-import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation")
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KStreamRepartitionTest {
private final String inputTopic = "input-topic";
@@ -76,11 +75,11 @@ public class KStreamRepartitionTest {
@Test
public void shouldInvokePartitionerWhenSet() {
final int[] expectedKeys = new int[]{0, 1};
- final StreamPartitioner<Integer, String> streamPartitionerMock = EasyMock.mock(StreamPartitioner.class);
+ @SuppressWarnings("unchecked")
+ final StreamPartitioner<Integer, String> streamPartitionerMock = mock(StreamPartitioner.class);
- expect(streamPartitionerMock.partition(anyString(), eq(0), eq("X0"), anyInt())).andReturn(1).times(1);
- expect(streamPartitionerMock.partition(anyString(), eq(1), eq("X1"), anyInt())).andReturn(1).times(1);
- replay(streamPartitionerMock);
+ when(streamPartitionerMock.partition(anyString(), eq(0), eq("X0"), anyInt())).thenReturn(1);
+ when(streamPartitionerMock.partition(anyString(), eq(1), eq("X1"), anyInt())).thenReturn(1);
final String repartitionOperationName = "test";
final Repartitioned<Integer, String> repartitioned = Repartitioned
@@ -112,7 +111,8 @@ public class KStreamRepartitionTest {
assertTrue(testOutputTopic.readRecordsToList().isEmpty());
}
- verify(streamPartitionerMock);
+ verify(streamPartitionerMock).partition(anyString(), eq(0), eq("X0"), anyInt());
+ verify(streamPartitionerMock).partition(anyString(), eq(1), eq("X1"), anyInt());
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 302845a973..b805dc273a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -26,40 +26,35 @@ import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.StreamsTestUtils;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
-import org.easymock.MockType;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.when;
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class MaterializedInternalTest {
- @Mock(type = MockType.NICE)
+ @Mock
private InternalNameProvider nameProvider;
-
- @Mock(type = MockType.NICE)
+ @Mock
private KeyValueBytesStoreSupplier supplier;
private final String prefix = "prefix";
@Test
public void shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() {
final String generatedName = prefix + "-store";
- EasyMock.expect(nameProvider.newStoreName(prefix)).andReturn(generatedName);
-
- EasyMock.replay(nameProvider);
+ when(nameProvider.newStoreName(prefix)).thenReturn(generatedName);
final MaterializedInternal<Object, Object, StateStore> materialized =
new MaterializedInternal<>(Materialized.with(null, null), nameProvider, prefix);
assertThat(materialized.storeName(), equalTo(generatedName));
- EasyMock.verify(nameProvider);
}
@Test
@@ -73,8 +68,7 @@ public class MaterializedInternalTest {
@Test
public void shouldUseStoreNameOfSupplierWhenProvided() {
final String storeName = "other-store-name";
- EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
- EasyMock.replay(supplier);
+ when(supplier.name()).thenReturn(storeName);
final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized =
new MaterializedInternal<>(Materialized.as(supplier), nameProvider, prefix);
assertThat(materialized.storeName(), equalTo(storeName));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
index 1eb55d0a08..9dc436bd60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
@@ -24,85 +24,77 @@ import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
-import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsSame.sameInstance;
import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-public class TransformerSupplierAdapterTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class TransformerSupplierAdapterTest {
+ @Mock
private ProcessorContext context;
+ @Mock
private Transformer<String, String, KeyValue<Integer, Integer>> transformer;
+ @Mock
private TransformerSupplier<String, String, KeyValue<Integer, Integer>> transformerSupplier;
+ @Mock
private Set<StoreBuilder<?>> stores;
final String key = "Hello";
final String value = "World";
- @Before
- public void before() {
- context = mock(ProcessorContext.class);
- transformer = mock(Transformer.class);
- transformerSupplier = mock(TransformerSupplier.class);
- stores = mock(Set.class);
- }
-
@Test
public void shouldCallInitOfAdapteeTransformer() {
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
- transformer.init(context);
- replayAll();
+ when(transformerSupplier.get()).thenReturn(transformer);
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
adaptedTransformer.init(context);
- verifyAll();
+ verify(transformer).init(context);
}
@Test
public void shouldCallCloseOfAdapteeTransformer() {
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
- transformer.close();
- replayAll();
+ when(transformerSupplier.get()).thenReturn(transformer);
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
adaptedTransformer.close();
- verifyAll();
+ verify(transformer).close();
}
@Test
public void shouldCallStoresOfAdapteeTransformerSupplier() {
- EasyMock.expect(transformerSupplier.stores()).andReturn(stores);
- replayAll();
+ when(transformerSupplier.stores()).thenReturn(stores);
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
adapter.stores();
- verifyAll();
}
@Test
public void shouldCallTransformOfAdapteeTransformerAndReturnSingletonIterable() {
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
- EasyMock.expect(transformer.transform(key, value)).andReturn(KeyValue.pair(0, 1));
- replayAll();
+ when(transformerSupplier.get()).thenReturn(transformer);
+ when(transformer.transform(key, value)).thenReturn(KeyValue.pair(0, 1));
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final Iterator<KeyValue<Integer, Integer>> iterator = adaptedTransformer.transform(key, value).iterator();
- verifyAll();
assertThat(iterator.hasNext(), equalTo(true));
iterator.next();
assertThat(iterator.hasNext(), equalTo(false));
@@ -110,31 +102,26 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
@Test
public void shouldCallTransformOfAdapteeTransformerAndReturnEmptyIterable() {
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer);
- EasyMock.expect(transformer.transform(key, value)).andReturn(null);
- replayAll();
+ when(transformerSupplier.get()).thenReturn(transformer);
+ when(transformer.transform(key, value)).thenReturn(null);
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adaptedTransformer = adapter.get();
final Iterator<KeyValue<Integer, Integer>> iterator = adaptedTransformer.transform(key, value).iterator();
- verifyAll();
assertThat(iterator.hasNext(), equalTo(false));
}
@Test
public void shouldAlwaysGetNewAdapterTransformer() {
+ @SuppressWarnings("unchecked")
final Transformer<String, String, KeyValue<Integer, Integer>> transformer1 = mock(Transformer.class);
+ @SuppressWarnings("unchecked")
final Transformer<String, String, KeyValue<Integer, Integer>> transformer2 = mock(Transformer.class);
+ @SuppressWarnings("unchecked")
final Transformer<String, String, KeyValue<Integer, Integer>> transformer3 = mock(Transformer.class);
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer1);
- transformer1.init(context);
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer2);
- transformer2.init(context);
- EasyMock.expect(transformerSupplier.get()).andReturn(transformer3);
- transformer3.init(context);
- replayAll();
+ when(transformerSupplier.get()).thenReturn(transformer1).thenReturn(transformer2).thenReturn(transformer3);
final TransformerSupplierAdapter<String, String, Integer, Integer> adapter =
new TransformerSupplierAdapter<>(transformerSupplier);
@@ -145,10 +132,12 @@ public class TransformerSupplierAdapterTest extends EasyMockSupport {
final Transformer<String, String, Iterable<KeyValue<Integer, Integer>>> adapterTransformer3 = adapter.get();
adapterTransformer3.init(context);
- verifyAll();
assertThat(adapterTransformer1, not(sameInstance(adapterTransformer2)));
assertThat(adapterTransformer2, not(sameInstance(adapterTransformer3)));
assertThat(adapterTransformer3, not(sameInstance(adapterTransformer1)));
+ verify(transformer1).init(context);
+ verify(transformer2).init(context);
+ verify(transformer3).init(context);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 009a70d2a2..953482f2a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -35,9 +35,10 @@ import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffe
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import java.time.Duration;
import java.util.Map;
@@ -49,7 +50,9 @@ import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecord
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.mock;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KTableSuppressProcessorMetricsTest {
private static final long ARBITRARY_LONG = 5L;
private static final TaskId TASK_ID = new TaskId(0, 0);
@@ -133,7 +136,8 @@ public class KTableSuppressProcessorMetricsTest {
.withLoggingDisabled()
.build();
- final KTableImpl<String, ?, Long> mock = EasyMock.mock(KTableImpl.class);
+ @SuppressWarnings("unchecked")
+ final KTableImpl<String, ?, Long> mock = mock(KTableImpl.class);
final Processor<String, Change<Long>, String, Change<Long>> processor =
new KTableSuppressProcessorSupplier<>(
(SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
index d715a3f975..4ce003f695 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ClientUtilsTest.java
@@ -36,8 +36,9 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.streams.errors.StreamsException;
-import org.easymock.EasyMock;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
@@ -47,14 +48,15 @@ import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchComm
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ClientUtilsTest {
// consumer and producer records use utf8 encoding for topic name, header keys, etc
@@ -108,75 +110,73 @@ public class ClientUtilsTest {
@Test
public void fetchCommittedOffsetsShouldRethrowKafkaExceptionAsStreamsException() {
- final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
- expect(consumer.committed(PARTITIONS)).andThrow(new KafkaException());
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+ when(consumer.committed(PARTITIONS)).thenThrow(new KafkaException());
assertThrows(StreamsException.class, () -> fetchCommittedOffsets(PARTITIONS, consumer));
}
@Test
public void fetchCommittedOffsetsShouldRethrowTimeoutException() {
- final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
- expect(consumer.committed(PARTITIONS)).andThrow(new TimeoutException());
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+ when(consumer.committed(PARTITIONS)).thenThrow(new TimeoutException());
assertThrows(TimeoutException.class, () -> fetchCommittedOffsets(PARTITIONS, consumer));
}
@Test
public void fetchCommittedOffsetsShouldReturnEmptyMapIfPartitionsAreEmpty() {
- final Consumer<byte[], byte[]> consumer = EasyMock.createMock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
assertTrue(fetchCommittedOffsets(emptySet(), consumer).isEmpty());
}
@Test
public void fetchEndOffsetsShouldReturnEmptyMapIfPartitionsAreEmpty() {
- final Admin adminClient = EasyMock.createMock(AdminClient.class);
+ final Admin adminClient = mock(AdminClient.class);
assertTrue(fetchEndOffsets(emptySet(), adminClient).isEmpty());
}
@Test
public void fetchEndOffsetsShouldRethrowRuntimeExceptionAsStreamsException() throws Exception {
- final Admin adminClient = EasyMock.createMock(AdminClient.class);
- final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
- final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
+ final Admin adminClient = mock(AdminClient.class);
+ final ListOffsetsResult result = mock(ListOffsetsResult.class);
+ @SuppressWarnings("unchecked")
+ final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = mock(KafkaFuture.class);
- EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
- EasyMock.expect(result.all()).andStubReturn(allFuture);
- EasyMock.expect(allFuture.get()).andThrow(new RuntimeException());
- replay(adminClient, result, allFuture);
+ when(adminClient.listOffsets(any())).thenReturn(result);
+ when(result.all()).thenReturn(allFuture);
+ when(allFuture.get()).thenThrow(new RuntimeException());
assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
- verify(adminClient);
}
@Test
public void fetchEndOffsetsShouldRethrowInterruptedExceptionAsStreamsException() throws Exception {
- final Admin adminClient = EasyMock.createMock(AdminClient.class);
- final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
- final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
+ final Admin adminClient = mock(AdminClient.class);
+ final ListOffsetsResult result = mock(ListOffsetsResult.class);
+ @SuppressWarnings("unchecked")
+ final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = mock(KafkaFuture.class);
- EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
- EasyMock.expect(result.all()).andStubReturn(allFuture);
- EasyMock.expect(allFuture.get()).andThrow(new InterruptedException());
- replay(adminClient, result, allFuture);
+ when(adminClient.listOffsets(any())).thenReturn(result);
+ when(result.all()).thenReturn(allFuture);
+ when(allFuture.get()).thenThrow(new InterruptedException());
assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
- verify(adminClient);
}
@Test
public void fetchEndOffsetsShouldRethrowExecutionExceptionAsStreamsException() throws Exception {
- final Admin adminClient = EasyMock.createMock(AdminClient.class);
- final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
- final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = EasyMock.createMock(KafkaFuture.class);
+ final Admin adminClient = mock(AdminClient.class);
+ final ListOffsetsResult result = mock(ListOffsetsResult.class);
+ @SuppressWarnings("unchecked")
+ final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = mock(KafkaFuture.class);
- EasyMock.expect(adminClient.listOffsets(EasyMock.anyObject())).andStubReturn(result);
- EasyMock.expect(result.all()).andStubReturn(allFuture);
- EasyMock.expect(allFuture.get()).andThrow(new ExecutionException(new RuntimeException()));
- replay(adminClient, result, allFuture);
+ when(adminClient.listOffsets(any())).thenReturn(result);
+ when(result.all()).thenReturn(allFuture);
+ when(allFuture.get()).thenThrow(new ExecutionException(new RuntimeException()));
assertThrows(StreamsException.class, () -> fetchEndOffsets(PARTITIONS, adminClient));
- verify(adminClient);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index e67793d676..331f0aebcc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer;
@@ -42,9 +41,11 @@ import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
-import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Collections;
@@ -69,14 +70,16 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.expect;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class HighAvailabilityStreamsPartitionAssignorTest {
private final List<PartitionInfo> infos = asList(
@@ -104,12 +107,17 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
private static final String USER_END_POINT = "localhost:8080";
private static final String APPLICATION_ID = "stream-partition-assignor-test";
+ @Mock
private TaskManager taskManager;
+ @Mock
private Admin adminClient;
private StreamsConfig streamsConfig = new StreamsConfig(configProps());
private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
private TopologyMetadata topologyMetadata = new TopologyMetadata(builder, streamsConfig);
- private final StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
+ @Mock
+ private StreamsMetadataState streamsMetadataState;
+ @Mock
+ private Consumer<byte[], byte[]> consumer;
private final Map<String, Subscription> subscriptions = new HashMap<>();
private ReferenceContainer referenceContainer;
@@ -120,7 +128,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
referenceContainer = new ReferenceContainer();
- referenceContainer.mainConsumer = EasyMock.mock(Consumer.class);
+ referenceContainer.mainConsumer = consumer;
referenceContainer.adminClient = adminClient;
referenceContainer.taskManager = taskManager;
referenceContainer.streamsMetadataState = streamsMetadataState;
@@ -137,7 +145,6 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
streamsConfig = new StreamsConfig(configMap);
topologyMetadata = new TopologyMetadata(builder, streamsConfig);
partitionAssignor.configure(configMap);
- EasyMock.replay(taskManager, adminClient);
overwriteInternalTopicManagerWithMock();
}
@@ -148,34 +155,27 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
}
private void createMockTaskManager(final Map<TaskId, Long> taskOffsetSums) {
- taskManager = EasyMock.createNiceMock(TaskManager.class);
- expect(taskManager.topologyMetadata()).andStubReturn(topologyMetadata);
- expect(taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes();
- expect(taskManager.processId()).andReturn(UUID_1).anyTimes();
+ when(taskManager.topologyMetadata()).thenReturn(topologyMetadata);
+ when(taskManager.processId()).thenReturn(UUID_1);
topologyMetadata.buildAndRewriteTopology();
}
// If you don't care about setting the end offsets for each specific topic partition, the helper method
// getTopicPartitionOffsetMap is useful for building this input map for all partitions
private void createMockAdminClient(final Map<TopicPartition, Long> changelogEndOffsets) {
- adminClient = EasyMock.createMock(AdminClient.class);
-
- final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
+ final ListOffsetsResult result = mock(ListOffsetsResult.class);
final KafkaFutureImpl<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = new KafkaFutureImpl<>();
allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap(
Entry::getKey,
t -> {
- final ListOffsetsResultInfo info = EasyMock.createNiceMock(ListOffsetsResultInfo.class);
- expect(info.offset()).andStubReturn(t.getValue());
- EasyMock.replay(info);
+ final ListOffsetsResultInfo info = mock(ListOffsetsResultInfo.class);
+ when(info.offset()).thenReturn(t.getValue());
return info;
}))
);
- expect(adminClient.listOffsets(anyObject())).andStubReturn(result);
- expect(result.all()).andReturn(allFuture);
-
- EasyMock.replay(result);
+ when(adminClient.listOffsets(any())).thenReturn(result);
+ when(result.all()).thenReturn(allFuture);
}
private void overwriteInternalTopicManagerWithMock() {
@@ -204,8 +204,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
createMockTaskManager(allTasks);
- adminClient = EasyMock.createMock(AdminClient.class);
- expect(adminClient.listOffsets(anyObject())).andThrow(new StreamsException("Should be handled"));
+ when(adminClient.listOffsets(any())).thenThrow(new StreamsException("Should be handled"));
configurePartitionAssignorWith(singletonMap(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, rebalanceInterval));
final String firstConsumer = "consumer1";