You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/02 17:37:12 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12418: KAFKA-13414: Replace PowerMock/EasyMock with Mockito in connect.storage.KafkaOffsetBackingStoreTest

C0urante commented on code in PR #12418:
URL: https://github.com/apache/kafka/pull/12418#discussion_r934915520


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -142,97 +163,90 @@ public void testStartStop() throws Exception {
         assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
 
-        store.start();
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();

Review Comment:
   We need to retain this logic somehow; otherwise, we lose some pretty significant guarantees about things like the `KafkaBasedLog` being closed when `KafkaOffsetBackingStore::stop` is invoked.
   
   I dealt with a similar issue in https://github.com/apache/kafka/pull/12409; you might find some of the techniques there useful. Or you might come up with something even better :)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -103,36 +103,57 @@ public class KafkaOffsetBackingStoreTest {
     private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
     private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
 
-    @Mock
-    KafkaBasedLog<byte[], byte[]> storeLog;
-    private KafkaOffsetBackingStore store;
-
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
-    private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
-    private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+    @SuppressWarnings("unchecked")

Review Comment:
   Can we use the `@Mock` annotation for this and others?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -142,97 +163,90 @@ public void testStartStop() throws Exception {
         assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
 
-        store.start();
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();
+        store.stop();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectConfigure();
-        expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty())
-        ));
-        expectStop();
-        expectClusterId();
-
-        PowerMock.replayAll();
+        /* Setup */
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            return null;
+        }).when(storeLog).start();
+
+        /* Execution */
 
         store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
-        HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+
+        /* Assertions */
+
+        // Unless the implementation of KafkaOffsetBackingStore changes there should only ever be 1 data field.
+        List<Field> boxedDataField = ReflectionSupport.findFields(KafkaOffsetBackingStore.class,
+                field -> "data".equals(field.getName()), HierarchyTraversalMode.TOP_DOWN);
+        // Unless the implementation of KafkaOffsetBackingStore changes it should be safe to get the data field
+        // as there should be exactly one.

Review Comment:
   Could we do away with reflections and just change `data` to be package-private? We use that approach in lots of other places in the code base, like here: https://github.com/apache/kafka/blob/5a2f28ce305183808d5932d44314f10b9beb21c5/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L210-L212



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java:
##########
@@ -142,97 +163,90 @@ public void testStartStop() throws Exception {
         assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
         assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
 
-        store.start();
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();
+        store.stop();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectConfigure();
-        expectStart(Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()),
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty())
-        ));
-        expectStop();
-        expectClusterId();
-
-        PowerMock.replayAll();
+        /* Setup */
+
+        doAnswer(invocation -> {
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                    new RecordHeaders(), Optional.empty()));
+            return null;
+        }).when(storeLog).start();
+
+        /* Execution */
 
         store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
-        HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+
+        /* Assertions */
+
+        // Unless the implementation of KafkaOffsetBackingStore changes there should only ever be 1 data field.
+        List<Field> boxedDataField = ReflectionSupport.findFields(KafkaOffsetBackingStore.class,
+                field -> "data".equals(field.getName()), HierarchyTraversalMode.TOP_DOWN);
+        // Unless the implementation of KafkaOffsetBackingStore changes it should be safe to get the data field
+        // as there should be exactly one.
+        @SuppressWarnings("unchecked")
+        HashMap<ByteBuffer, ByteBuffer> data = (HashMap<ByteBuffer, ByteBuffer>) ReflectionSupport.tryToReadFieldValue(
+                boxedDataField.get(0), store).get();
         assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
         assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
 
-        store.stop();
+        /* Clean up */
 
-        PowerMock.verifyAll();
+        store.stop();
     }
 
     @Test
     public void testGetSet() throws Exception {
-        expectConfigure();
-        expectStart(Collections.emptyList());
-        expectStop();
-
-        // First get() against an empty store
-        final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
-            firstGetReadToEndCallback.getValue().onCompletion(null, null);
-            return null;
-        });
-
-        // Set offsets
-        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
-        PowerMock.expectLastCall();
-        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
-        PowerMock.expectLastCall();
+        /* Setup */
 
-        // Second get() should get the produced data and return the new values
-        final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> storeLogCallbackArgumentCaptor = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            // First get() against an empty store
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
+            return null;
+        }).
+        doAnswer(invocation -> {
+            // Second get() should get the produced data and return the new values
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()));
+                    new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
-                    new RecordHeaders(), Optional.empty()));
-            secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                    new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
             return null;
-        });
-
-        // Third get() should pick up data produced by someone else and return those values
-        final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
-        storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
-        PowerMock.expectLastCall().andAnswer(() -> {
+        }).doAnswer(invocation -> {
+            // Third get() should pick up data produced by someone else and return those values
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()));
+                    new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
+                            new RecordHeaders(), Optional.empty()));
             capturedConsumedCallback.getValue().onCompletion(null,
-                new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
-                    new RecordHeaders(), Optional.empty()));
-            thirdGetReadToEndCallback.getValue().onCompletion(null, null);
+                    new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
+                            new RecordHeaders(), Optional.empty()));
+            storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
             return null;
-        });
+        }).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());

Review Comment:
   We don't have to set up the expectations for all three calls to `KafkaOffsetBackingStore::get` at once. We can set up the expectation for each right before we invoke the method.
   
   This has a few advantages:
   - Cleanliness (the chained `doAnswer` method invocations are a little unintuitive)
   - Readability (placing expectation-setting code closer to execution code makes it easier to see how the two are related)
   
   In general, I think we should try to follow Mockito idioms more closely with these tests by establishing expectations as late as possible in tests, instead of having a monolithic expectation-setting section at the beginning of each test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org