You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/10 06:49:25 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r705913852



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -977,6 +977,11 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
         return streamThread;
     }
 
+    // Ensure Mockito stub construct with capture argument for KafkaStreamsTest.
+    public static Metrics createThisMetrics(final MetricConfig metricConfig, final List<MetricsReporter> reporters, final Time time, final MetricsContext metricsContext) {

Review comment:
       Why need the word "this"?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return isStateAlive();
         }
     }
 
+    // Ensure Mockito can stub method for KafkaStreamTest.
+    public boolean isStateAlive() {

Review comment:
       Personally, it is an anti-pattern. There are two Public methods returning same "value" but one of them is holding lock. It is hard to use them correctly. At any rate, this is unrelated to this PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -903,20 +897,15 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
             streams.start();
         }
 
-        PowerMock.verify(Executors.class);

Review comment:
       please add `verify` to make sure `rocksDBMetricsRecordingTriggerThread` is used to create thread in this test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread thread,
                 "",
                 Collections.emptySet(),
                 Collections.emptySet()
-            )
-        ).anyTimes();
-        EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
-        EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCache(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
-        thread.requestLeaveGroupDuringShutdown();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId);
-        thread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ));
+        when(thread.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true);
+        when(thread.isStateAlive()).thenReturn(true);
+        verify(thread, atMostOnce()).isStateAlive();

Review comment:
       It seems to me this check is no-op since the thread is not ready to be used. Hence, the execution times is always 0.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread thread,
                 "",
                 Collections.emptySet(),
                 Collections.emptySet()
-            )
-        ).anyTimes();
-        EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class), anyLong())).andStubReturn(true);
-        EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCache(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
-        thread.requestLeaveGroupDuringShutdown();
-        EasyMock.expectLastCall().anyTimes();
-        EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + threadId);
-        thread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ));
+        when(thread.waitOnThreadState(isA(StreamThread.State.class), anyLong())).thenReturn(true);
+        when(thread.isStateAlive()).thenReturn(true);
+        verify(thread, atMostOnce()).isStateAlive();
+        when(thread.getName()).thenReturn("processId-StreamThread-" + threadId);
+
+        doAnswer(invocation -> {
             supplier.consumer.close();
             supplier.restoreConsumer.close();
             for (final MockProducer<byte[], byte[]> producer : supplier.producers) {
                 producer.close();
             }
             state.set(StreamThread.State.DEAD);
 
-            threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
+            threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
+            threadStateListenerCapture.getValue().onChange(thread, StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
             return null;
-        }).anyTimes();
-        EasyMock.expect(thread.isRunning()).andReturn(state.get() == StreamThread.State.RUNNING).anyTimes();
-        thread.join();
+        }).when(thread).shutdown();
+
+        when(thread.isRunning()).thenReturn(state.get() == StreamThread.State.RUNNING);
+
         if (terminable) {

Review comment:
       If you don't want to check the `join` method, please remove this empty body.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -874,26 +868,26 @@ public void shouldNotBlockInCloseForZeroDuration() {
 
     @Test
     public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
-        PowerMock.mockStatic(Executors.class);
-        final ScheduledExecutorService cleanupSchedule = EasyMock.niceMock(ScheduledExecutorService.class);
+        final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = mock(ScheduledExecutorService.class, withSettings().lenient());
         final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread =
-            EasyMock.mock(ScheduledExecutorService.class);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(cleanupSchedule);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(rocksDBMetricsRecordingTriggerThread);
-        EasyMock.expect(rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate(
-            EasyMock.anyObject(RocksDBMetricsRecordingTrigger.class),
-            EasyMock.eq(0L),
-            EasyMock.eq(1L),
-            EasyMock.eq(TimeUnit.MINUTES)
-        )).andReturn(null);
-        EasyMock.expect(rocksDBMetricsRecordingTriggerThread.shutdownNow()).andReturn(null);
-        PowerMock.replay(Executors.class);
-        PowerMock.replay(rocksDBMetricsRecordingTriggerThread);
-        PowerMock.replay(cleanupSchedule);
+            mock(ScheduledExecutorService.class);
+
+        executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor(
+            any(ThreadFactory.class)
+        )).thenReturn(cleanupSchedule);
+        executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor(

Review comment:
       The different `ThreadFactory` should get different `ScheduledExecutorService` (see production code `KafkaStreams`). That was a bug in previous test. However, it was fine since all this test want to do is to make sure `rocksDBMetricsRecordingTriggerThread` is executed.
   
   Could you follow the production code to return different `ScheduledExecutorService`? or add comments to say "this is fine because ..."

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -903,20 +897,15 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
             streams.start();
         }
 
-        PowerMock.verify(Executors.class);
-        PowerMock.verify(rocksDBMetricsRecordingTriggerThread);
     }
 
     @Test
     public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
-        PowerMock.mockStatic(Executors.class);
-        final ScheduledExecutorService cleanupSchedule = EasyMock.niceMock(ScheduledExecutorService.class);
-        final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread =
-            EasyMock.mock(ScheduledExecutorService.class);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(cleanupSchedule);
-        PowerMock.replay(Executors.class, rocksDBMetricsRecordingTriggerThread, cleanupSchedule);
+        final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = mock(ScheduledExecutorService.class, withSettings().lenient());
+        final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread = mock(ScheduledExecutorService.class);

Review comment:
       please make sure `rocksDBMetricsRecordingTriggerThread` is not used to create thread in this test

##########
File path: build.gradle
##########
@@ -1751,8 +1749,6 @@ project(':streams') {
     testImplementation libs.junitJupiterApi
     testImplementation libs.junitVintageEngine
     testImplementation libs.easymock
-    testImplementation libs.powermockJunit4

Review comment:
       `TableSourceNodeTest` is still using powermock




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org