You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/10 16:55:39 UTC

[GitHub] [kafka] wycccccc opened a new pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

wycccccc opened a new pull request #11017:
URL: https://github.com/apache/kafka/pull/11017


   Development of EasyMock and PowerMock has stagnated while Mockito continues to be actively developed. With the new Java cadence, it's a problem to depend on libraries that do bytecode generation and are not actively maintained. In addition, Mockito is also easier to [use.KAFKA-7438](https://issues.apache.org/jira/browse/KAFKA-12950)
   
   build.gradle will update after other related issues are merged, avoid conflicts.Minor changes to the source code.I think this is inevitable.If there is a better solution to avoid changes, suggestions are welcome.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#issuecomment-913720572


   @wycccccc Can you please fix the conflict? @chia7712 How do you feel about this PR, is it close to being ready to be merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667442355



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -939,6 +939,11 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
         return streamThread;
     }
 
+    //For KafkaStreamsTest

Review comment:
       I have update code, thanks for all your comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#issuecomment-913789472


   Btw, this is the only Streams test that is disabled with Java 16 and newer:
   https://github.com/apache/kafka/blob/trunk/build.gradle#L389


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667443023



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -96,13 +94,7 @@
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 
-import static org.easymock.EasyMock.anyBoolean;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.easymock.EasyMock.anyInt;

Review comment:
       I am worried that the current changes for `build,gradle` will cause conflicts. I am going to prepare a separate PR after all related issues are merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r705913852



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -977,6 +977,11 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
         return streamThread;
     }
 
+    // Ensure Mockito stub construct with capture argument for KafkaStreamsTest.
+    public static Metrics createThisMetrics(final MetricConfig metricConfig, final List<MetricsReporter> reporters, final Time time, final MetricsContext metricsContext) {

Review comment:
       Why need the word "this"?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return isStateAlive();
         }
     }
 
+    // Ensure Mockito can stub method for KafkaStreamTest.
+    public boolean isStateAlive() {

Review comment:
       Personally, it is an anti-pattern. There are two Public methods returning same "value" but one of them is holding lock. It is hard to use them correctly. At any rate, this is unrelated to this PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -903,20 +897,15 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
             streams.start();
         }
 
-        PowerMock.verify(Executors.class);

Review comment:
       please add `verify` to make sure `rocksDBMetricsRecordingTriggerThread` is used to create thread in this test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread thread,
                 "",
                 Collections.emptySet(),
                 Collections.emptySet()
-            )
-        ).anyTimes();
-        EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
-        EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCache(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
-        thread.requestLeaveGroupDuringShutdown();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId);
-        thread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ));
+        when(thread.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true);
+        when(thread.isStateAlive()).thenReturn(true);
+        verify(thread, atMostOnce()).isStateAlive();

Review comment:
       It seems to me this check is no-op since the thread is not ready to be used. Hence, the execution times is always 0.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread thread,
                 "",
                 Collections.emptySet(),
                 Collections.emptySet()
-            )
-        ).anyTimes();
-        EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
-        EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCache(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
-        thread.requestLeaveGroupDuringShutdown();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId);
-        thread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ));
+        when(thread.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true);
+        when(thread.isStateAlive()).thenReturn(true);
+        verify(thread, atMostOnce()).isStateAlive();
+        when(thread.getName()).thenReturn("processId-StreamThread-" + threadId);
+
+        doAnswer(invocation -> {
             supplier.consumer.close();
             supplier.restoreConsumer.close();
             for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
                 producer.close();
             }
             state.set(StreamThread.State.DEAD);
 
-            threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
+            threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
+            threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
             return null;
-        }).anyTimes();
-        EasyMock.expect(thread.isRunning()).andReturn(state.get() == StreamThread.State.RUNNING).anyTimes();
-        thread.join();
+        }).when(thread).shutdown();
+
+        when(thread.isRunning()).thenReturn(state.get() == StreamThread.State.RUNNING);
+
         if (terminable) {

Review comment:
       If you don't want to check the `join` method, please remove this empty body.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -874,26 +868,26 @@ public void shouldNotBlockInCloseForZeroDuration() {
 
     @Test
     public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
-        PowerMock.mockStatic(Executors.class);
-        final ScheduledExecutorService cleanupSchedule = EasyMock.niceMock(ScheduledExecutorService.class);
+        final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = mock(ScheduledExecutorService.class, withSettings().lenient());
         final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread =
-            EasyMock.mock(ScheduledExecutorService.class);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(cleanupSchedule);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(rocksDBMetricsRecordingTriggerThread);
-        EasyMock.expect(rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate(
-            EasyMock.anyObject(RocksDBMetricsRecordingTrigger.class),
-            EasyMock.eq(0L),
-            EasyMock.eq(1L),
-            EasyMock.eq(TimeUnit.MINUTES)
-        )).andReturn(null);
-        EasyMock.expect(rocksDBMetricsRecordingTriggerThread.shutdownNow()).andReturn(null);
-        PowerMock.replay(Executors.class);
-        PowerMock.replay(rocksDBMetricsRecordingTriggerThread);
-        PowerMock.replay(cleanupSchedule);
+            mock(ScheduledExecutorService.class);
+
+        executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor(
+            any(ThreadFactory.class)
+        )).thenReturn(cleanupSchedule);
+        executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor(

Review comment:
       The different `ThreadFactory` should get different `ScheduledExecutorService` (see production code `KafkaStreams`). That was a bug in previous test. However, it was fine since all this test want to do is to make sure `rocksDBMetricsRecordingTriggerThread` is executed.
   
   Could you follow the production code to return different `ScheduledExecutorService`? or add comments to say "this is fine because ..."

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -903,20 +897,15 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
             streams.start();
         }
 
-        PowerMock.verify(Executors.class);
-        PowerMock.verify(rocksDBMetricsRecordingTriggerThread);
     }
 
     @Test
     public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
-        PowerMock.mockStatic(Executors.class);
-        final ScheduledExecutorService cleanupSchedule = EasyMock.niceMock(ScheduledExecutorService.class);
-        final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread =
-            EasyMock.mock(ScheduledExecutorService.class);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(cleanupSchedule);
-        PowerMock.replay(Executors.class, rocksDBMetricsRecordingTriggerThread, cleanupSchedule);
+        final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = mock(ScheduledExecutorService.class, withSettings().lenient());
+        final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread = mock(ScheduledExecutorService.class);

Review comment:
       please make sure `rocksDBMetricsRecordingTriggerThread` is not used to create thread in this test

##########
File path: build.gradle
##########
@@ -1751,8 +1749,6 @@ project(':streams') {
     testImplementation libs.junitJupiterApi
     testImplementation libs.junitVintageEngine
     testImplementation libs.easymock
-    testImplementation libs.powermockJunit4

Review comment:
       `TableSourceNodeTest` is still using powermock




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#issuecomment-913789041


   @chia7712 The conflict has been resolved, let me know if you have any situation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667443055



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -61,18 +59,18 @@
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.After;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#issuecomment-916526936


   @chia7712 Conflicts are resolved. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#issuecomment-913747482


   > How do you feel about this PR, is it close to being ready to be merged?
   
   I need to take another look after those conflicts are fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667442158



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return getIsAlive();
         }
     }
 
+    //For KafkaStreamTest
+    public boolean getIsAlive() {

Review comment:
       copy that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r703026597



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -111,9 +104,24 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.MockitoAnnotations.openMocks;
 
-@RunWith(PowerMockRunner.class)

Review comment:
       Are there other Streams tests using `PowerMock`? If not, we should also update the build dependency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667511730



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -939,7 +939,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
         return streamThread;
     }
 
-    //For KafkaStreamsTest
+    //Ensure Mockito stub construct with capture argument for KafkaStreamsTest.

Review comment:
       Could you add a space between `//` and `Ensure`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667361897



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return getIsAlive();
         }
     }
 
+    //For KafkaStreamTest
+    public boolean getIsAlive() {

Review comment:
       this naming is a bit weird to me. How about `isStateAlive`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -96,13 +94,7 @@
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 
-import static org.easymock.EasyMock.anyBoolean;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.easymock.EasyMock.anyInt;

Review comment:
       Is `easymock` still used by streams module? If not, could you remove it from `build.gradle`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return getIsAlive();
         }
     }
 
+    //For KafkaStreamTest

Review comment:
       ditto. Could you add more description?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -182,152 +192,138 @@ public void before() throws Exception {
         prepareStreams();
     }
 
+    @After
+    public void tearDown() {
+        kafkaStreamsMockedStatic.close();
+        clientMetricsMockedStatic.close();
+        streamThreadMockedStatic.close();
+        globalStreamThreadMockedConstruction.close();
+    }
+
     private void prepareStreams() throws Exception {
         // setup metrics
-        PowerMock.expectNew(Metrics.class,
-            anyObject(MetricConfig.class),
-            capture(metricsReportersCapture),
-            anyObject(Time.class),
-            anyObject(MetricsContext.class)
-        ).andAnswer(() -> {
+        kafkaStreamsMockedStatic = mockStatic(KafkaStreams.class, withSettings()
+                .defaultAnswer(InvocationOnMock::callRealMethod));
+        kafkaStreamsMockedStatic.when(() -> KafkaStreams.createThisMetrics(
+                any(MetricConfig.class),
+                metricsReportersCapture.capture(),
+                any(Time.class),
+                any(MetricsContext.class)
+        )).thenAnswer(invocation -> {
             for (final MetricsReporter reporter : metricsReportersCapture.getValue()) {
                 reporter.init(Collections.emptyList());
             }
             return metrics;
-        }).anyTimes();
-        metrics.close();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        });
+
+        doAnswer(invocation -> {
             for (final MetricsReporter reporter : metricsReportersCapture.getValue()) {
                 reporter.close();
             }
             return null;
-        }).anyTimes();
-
-        PowerMock.mockStatic(ClientMetrics.class);
-        EasyMock.expect(ClientMetrics.version()).andReturn("1.56");
-        EasyMock.expect(ClientMetrics.commitId()).andReturn("1a2b3c4d5e");
-        ClientMetrics.addVersionMetric(anyObject(StreamsMetricsImpl.class));
-        ClientMetrics.addCommitIdMetric(anyObject(StreamsMetricsImpl.class));
-        ClientMetrics.addApplicationIdMetric(anyObject(StreamsMetricsImpl.class), EasyMock.eq(APPLICATION_ID));
-        ClientMetrics.addTopologyDescriptionMetric(anyObject(StreamsMetricsImpl.class), anyString());
-        ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class), anyObject());
-        ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class), anyObject());
+        }).when(metrics).close();
+
+        clientMetricsMockedStatic = mockStatic(ClientMetrics.class);
+        clientMetricsMockedStatic.when(ClientMetrics::version).thenReturn("1.56");
+        clientMetricsMockedStatic.when(ClientMetrics::commitId).thenReturn("1a2b3c4d5e");
+        ClientMetrics.addVersionMetric(any(StreamsMetricsImpl.class));
+        ClientMetrics.addCommitIdMetric(any(StreamsMetricsImpl.class));
+        ClientMetrics.addApplicationIdMetric(any(StreamsMetricsImpl.class), eq(APPLICATION_ID));
+        ClientMetrics.addTopologyDescriptionMetric(any(StreamsMetricsImpl.class), any());
+        ClientMetrics.addStateMetric(any(StreamsMetricsImpl.class), any());
+        ClientMetrics.addNumAliveStreamThreadMetric(any(StreamsMetricsImpl.class), any());
 
         // setup stream threads
-        PowerMock.mockStatic(StreamThread.class);
-        EasyMock.expect(StreamThread.create(
-            anyObject(InternalTopologyBuilder.class),
-            anyObject(StreamsConfig.class),
-            anyObject(KafkaClientSupplier.class),
-            anyObject(Admin.class),
-            anyObject(UUID.class),
-            anyObject(String.class),
-            anyObject(StreamsMetricsImpl.class),
-            anyObject(Time.class),
-            anyObject(StreamsMetadataState.class),
-            anyLong(),
-            anyObject(StateDirectory.class),
-            anyObject(StateRestoreListener.class),
-            anyInt(),
-            anyObject(Runnable.class),
-            anyObject()
-        )).andReturn(streamThreadOne).andReturn(streamThreadTwo);
-
-        EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
-        EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
-        EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
-        EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
+        streamThreadMockedStatic = mockStatic(StreamThread.class);
+        streamThreadMockedStatic.when(() -> StreamThread.create(
+                any(InternalTopologyBuilder.class),
+                any(StreamsConfig.class),
+                any(KafkaClientSupplier.class),
+                any(Admin.class),
+                any(UUID.class),
+                any(String.class),
+                any(StreamsMetricsImpl.class),
+                any(Time.class),
+                any(StreamsMetadataState.class),
+                anyLong(),
+                any(StateDirectory.class),
+                any(StateRestoreListener.class),
+                anyInt(),
+                any(Runnable.class),
+                any()
+        )).thenReturn(streamThreadOne).thenReturn(streamThreadTwo);
+
+        streamThreadMockedStatic.when(() -> StreamThread.eosEnabled(any(StreamsConfig.class))).thenReturn(false);
+        streamThreadMockedStatic.when(() -> StreamThread.processingMode(any(StreamsConfig.class))).thenReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE);
+        when(streamThreadOne.getId()).thenReturn(1L);
+        when(streamThreadTwo.getId()).thenReturn(2L);
+
         prepareStreamThread(streamThreadOne, 1, true);
         prepareStreamThread(streamThreadTwo, 2, false);
 
         // setup global threads
         final AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<>(GlobalStreamThread.State.CREATED);
-        PowerMock.expectNew(GlobalStreamThread.class,
-            anyObject(ProcessorTopology.class),
-            anyObject(StreamsConfig.class),
-            anyObject(Consumer.class),
-            anyObject(StateDirectory.class),
-            anyLong(),
-            anyObject(StreamsMetricsImpl.class),
-            anyObject(Time.class),
-            anyString(),
-            anyObject(StateRestoreListener.class),
-            anyObject(StreamsUncaughtExceptionHandler.class)
-        ).andReturn(globalStreamThread).anyTimes();
-        EasyMock.expect(globalStreamThread.state()).andAnswer(globalThreadState::get).anyTimes();
-        globalStreamThread.setStateListener(capture(threadStatelistenerCapture));
-        EasyMock.expectLastCall().anyTimes();
-
-        globalStreamThread.start();
-        EasyMock.expectLastCall().andAnswer(() -> {
-            globalThreadState.set(GlobalStreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
-                GlobalStreamThread.State.RUNNING,
-                GlobalStreamThread.State.CREATED);
-            return null;
-        }).anyTimes();
-        globalStreamThread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
-            supplier.restoreConsumer.close();
 
-            for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
-                producer.close();
-            }
-            globalThreadState.set(GlobalStreamThread.State.DEAD);
-            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
-                GlobalStreamThread.State.PENDING_SHUTDOWN,
-                GlobalStreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
-                GlobalStreamThread.State.DEAD,
-                GlobalStreamThread.State.PENDING_SHUTDOWN);
-            return null;
-        }).anyTimes();
-        EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get() == GlobalStreamThread.State.RUNNING).anyTimes();
-        globalStreamThread.join();
-        EasyMock.expectLastCall().anyTimes();
-
-        PowerMock.replay(
-            StreamThread.class,
-            Metrics.class,
-            metrics,
-            ClientMetrics.class,
-            streamThreadOne,
-            streamThreadTwo,
-            GlobalStreamThread.class,
-            globalStreamThread
-        );
+        globalStreamThreadMockedConstruction = mockConstruction(GlobalStreamThread.class,
+                (mock, context) -> {
+                    when(mock.state()).thenAnswer(invocation -> globalThreadState.get());
+                    doNothing().when(mock).setStateListener(threadStateListenerCapture.capture());
+                    doAnswer(invocation -> {
+                        globalThreadState.set(GlobalStreamThread.State.RUNNING);
+                        threadStateListenerCapture.getValue().onChange(mock,
+                                GlobalStreamThread.State.RUNNING,
+                                GlobalStreamThread.State.CREATED);
+                        return null;
+                    }).when(mock).start();
+                    doAnswer(invocation -> {
+                        supplier.restoreConsumer.close();
+
+                        for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
+                            producer.close();
+                        }
+                        globalThreadState.set(GlobalStreamThread.State.DEAD);
+                        threadStateListenerCapture.getValue().onChange(mock,
+                                GlobalStreamThread.State.PENDING_SHUTDOWN,
+                                GlobalStreamThread.State.RUNNING);
+                        threadStateListenerCapture.getValue().onChange(mock,
+                                GlobalStreamThread.State.DEAD,
+                                GlobalStreamThread.State.PENDING_SHUTDOWN);
+                        return null;
+                    }).when(mock).shutdown();
+                    when(mock.stillRunning()).thenReturn(globalThreadState.get() == GlobalStreamThread.State.RUNNING);
+                });
+
     }
 
     private void prepareStreamThread(final StreamThread thread,
                                      final int threadId,
                                      final boolean terminable) throws Exception {
         final AtomicReference<StreamThread.State> state = new AtomicReference<>(StreamThread.State.CREATED);
-        EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
+        when(thread.state()).thenAnswer(invocation -> state.get());
 
-        thread.setStateListener(capture(threadStatelistenerCapture));
-        EasyMock.expectLastCall().anyTimes();
+        doNothing().when(thread).setStateListener(threadStateListenerCapture.capture());
 
-        EasyMock.expect(thread.getStateLock()).andReturn(new Object()).anyTimes();
+        when(thread.getStateLock()).thenReturn(new Object());
 
-        thread.start();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        doAnswer(invocation -> {
             state.set(StreamThread.State.STARTING);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.STARTING,
-                StreamThread.State.CREATED);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.PARTITIONS_REVOKED,
-                StreamThread.State.STARTING);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.PARTITIONS_ASSIGNED,
-                StreamThread.State.PARTITIONS_REVOKED);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.RUNNING,
-                StreamThread.State.PARTITIONS_ASSIGNED);
+            threadStateListenerCapture.getValue().onChange(thread,
+                    StreamThread.State.STARTING,

Review comment:
       redundant indent?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -61,18 +59,18 @@
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.After;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;

Review comment:
       Is `powermock` still used by streams module? If not, could you remove it from build.gradle?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -939,6 +939,11 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
         return streamThread;
     }
 
+    //For KafkaStreamsTest

Review comment:
       Why? Could you please add more details?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r703026597



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -111,9 +104,24 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.MockitoAnnotations.openMocks;
 
-@RunWith(PowerMockRunner.class)

Review comment:
       Are there other Streams tests using `PowerMock`? If not, we should also update the module build dependencies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667974480



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -939,7 +939,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
         return streamThread;
     }
 
-    //For KafkaStreamsTest
+    //Ensure Mockito stub construct with capture argument for KafkaStreamsTest.

Review comment:
       Have changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
wycccccc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r704545691



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -111,9 +104,24 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.MockitoAnnotations.openMocks;
 
-@RunWith(PowerMockRunner.class)

Review comment:
       I remember that I have modified all the streams and I have updated the module build dependencies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tang7526 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

Posted by GitBox <gi...@apache.org>.
tang7526 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667512077



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,12 +246,12 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return getIsAlive();
+            return isStateAlive();
         }
     }
 
-    //For KafkaStreamTest
-    public boolean getIsAlive() {
+    //Ensure Mockito can stub method for KafkaStreamTest.

Review comment:
       Could you add a space between // and Ensure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org