You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/29 14:29:56 UTC

[GitHub] [kafka] tombentley commented on a change in pull request #11450: KAFKA-13414: Replace Powermock/EasyMock by Mockito in connect.storage

tombentley commented on a change in pull request #11450:
URL: https://github.com/apache/kafka/pull/11450#discussion_r758385491



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.java
##########
@@ -185,82 +181,64 @@ public void deleteTopicStatus() {
     public void putTopicState() {
         TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
         String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
-        Capture<byte[]> valueCapture = newCapture();
-        Capture<Callback> callbackCapture = newCapture();
-        kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
-        expectLastCall()
-                .andAnswer(() -> {
-                    callbackCapture.getValue().onCompletion(null, null);
-                    return null;
-                });
-        replayAll();
+        ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
+        doAnswer(invocation -> {
+            ((Callback) invocation.getArgument(2)).onCompletion(null, null);
+            return null;
+        }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class));
 
         store.put(topicStatus);
         // check capture state
-        assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
+        assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue()));
         // state is not visible until read back from the log
         assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
 
-        ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCapture.getValue());
+        ConsumerRecord<String, byte[]> statusRecord = new ConsumerRecord<>(STATUS_TOPIC, 0, 0, key, valueCaptor.getValue());
         store.read(statusRecord);
         assertEquals(topicStatus, store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
-        assertEquals(new HashSet<>(Collections.singletonList(topicStatus)), new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
-
-        verifyAll();
+        assertEquals(Collections.singleton(topicStatus), new HashSet<>(store.getAllTopics(FOO_CONNECTOR)));
     }
 
     @Test
     public void putTopicStateRetriableFailure() {
         TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), time.milliseconds());
         String key = TOPIC_STATUS_PREFIX + FOO_TOPIC + TOPIC_STATUS_SEPARATOR + FOO_CONNECTOR;
-        Capture<byte[]> valueCapture = newCapture();
-        Capture<Callback> callbackCapture = newCapture();
-        kafkaBasedLog.send(eq(key), capture(valueCapture), capture(callbackCapture));
-        expectLastCall()
-                .andAnswer(() -> {
-                    callbackCapture.getValue().onCompletion(null, new TimeoutException());
-                    return null;
-                })
-                .andAnswer(() -> {
-                    callbackCapture.getValue().onCompletion(null, null);
-                    return null;
-                });
-
-        replayAll();
+
+        ArgumentCaptor<byte[]> valueCaptor = ArgumentCaptor.forClass(byte[].class);
+        doAnswer(invocation -> {
+            ((Callback) invocation.getArgument(2)).onCompletion(null, new TimeoutException());
+            return null;
+        }).doAnswer(invocation -> {
+            ((Callback) invocation.getArgument(2)).onCompletion(null, null);
+            return null;
+        }).when(kafkaBasedLog).send(eq(key), valueCaptor.capture(), any(Callback.class));
+
         store.put(topicStatus);
 
         // check capture state
-        assertEquals(topicStatus, store.parseTopicStatus(valueCapture.getValue()));
+        assertEquals(topicStatus, store.parseTopicStatus(valueCaptor.getValue()));
         // state is not visible until read back from the log
         assertNull(store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
-
-        verifyAll();

Review comment:
       I think the assertion on 219 would pass even if the 1st mocked interaction never happened. Do we need something to tighten up the expected behaviour? Maybe something like:
   
   ```java
   verify(kafkaBasedLog, times(2)).send(any(), any(), any());
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
##########
@@ -144,74 +126,60 @@ public void testFlushFailureReplacesOffsets() throws Exception {
         // such that a subsequent flush will write them.
 
         @SuppressWarnings("unchecked")
-        final Callback<Void> callback = PowerMock.createMock(Callback.class);
+        final Callback<Void> callback = mock(Callback.class);
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, true, null);
-        // Second time it succeeds
-        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null);
-        // Third time it has no data to flush so we won't get past beginFlush()
 
-        PowerMock.replayAll();
+        // Third time it has no data to flush so we won't get past beginFlush()

Review comment:
       Now that the `// Second time it succeeds` comment has been removed it's a little be weird to just have comments about the first and third time followed by one about `// Second time it succeeds`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org