You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "hgeraldino (via GitHub)" <gi...@apache.org> on 2023/02/03 00:14:05 UTC

[GitHub] [kafka] hgeraldino opened a new pull request, #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

hgeraldino opened a new pull request, #13191:
URL: https://github.com/apache/kafka/pull/13191

   This PR is part of the larger initiative of migrating tests from EasyMock/PowerMock to Mockito](https://issues.apache.org/jira/browse/KAFKA-7438), and is a "blocker" of https://issues.apache.org/jira/browse/KAFKA-14659 (not really a blocker, but it'd be nice to write tests for that bug fix after this test class is migrated)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112474934


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());
     }
 
+    @SuppressWarnings("SameParameterValue")
     private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }
+        when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(topic));
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
         Set<String> created = Collections.singleton(topic);
         Set<String> existing = Collections.emptySet();
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
         Set<String> created = Collections.emptySet();
         Set<String> existing = Collections.singleton(topic);
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
     private void expectPreliminaryCalls() {
-        expectPreliminaryCalls(TOPIC);
-    }
+        expectConvertHeadersAndKeyValue(emptyHeaders());
+        expectApplyTransformationChain();
+    }
+
+    private void expectConvertHeadersAndKeyValue(Headers headers) {

Review Comment:
   Makes sense. IJ noticed that the parameter was always the same and suggested to just use the constant, but I'll change it back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1420944086

   > It's also worth noting that there are unused stubbings in some of these tests, which should be failing the build but are not at the moment due to [KAFKA-14682](https://issues.apache.org/jira/browse/KAFKA-14682). You can find these unused stubbings by running `./gradlew :connect:runtime:test --tests AbstractWorkerSourceTaskTest` in your command line, or possibly by running the entire `AbstractWorkerSourceTaskTest` test suite in IntelliJ (which is how I discovered them). These unused stubbings should be removed before we merge the PR.
   
   I have a question about this, why didn't the GitHub builds fail with this when running the tests (or am I misinterpreting the output)? I wouldn't expect to locally build every pull request to carry out such checks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1097852548


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest {
     @Mock private ConnectorOffsetBackingStore offsetStore;
     @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerSourceTaskContext sourceTaskContext;
-    @MockStrict private TaskStatus.Listener statusListener;

Review Comment:
   In order to retain the same guarantees we have currently w/r/t interactions with this class, can we add a call to `verifyNoMoreInteractions(statusListener);` in the `tearDown` method?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,93 +814,25 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(transformationChain.apply(any(SourceRecord.class)))
+            .thenAnswer((Answer<SourceRecord>) invocation -> invocation.getArgument(0));
+        when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA),
+            anyString()))
+            .thenAnswer((Answer<byte[]>) invocation -> {
+                String headerValue = invocation.getArgument(3, String.class);
+                return headerValue.getBytes(StandardCharsets.UTF_8);
+            });
+        when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(KEY_SCHEMA), eq(KEY)))
+            .thenReturn(SERIALIZED_KEY);
+        when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(RECORD_SCHEMA),
+            eq(RECORD)))
+            .thenReturn(SERIALIZED_RECORD);
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
-        if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
-
-        expectApplyTransformationChain(anyTimes);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
-
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
-
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
-
-        expectTaskGetTopic(anyTimes);
-
-        return sent;
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
-        return expectSendRecord(TOPIC, true, emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }

Review Comment:
   Why remove this method? It seems like now there's a lot more duplication in each test case that used to call it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112459731


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -235,115 +236,100 @@ public void testMetricsGroup() {
     public void testSendRecordsConvertsData() {
         createWorkerTask();
 
-        List<SourceRecord> records = new ArrayList<>();
         // Can just use the same record for key and value
-        records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+        List<SourceRecord> records = Collections.singletonList(
+            new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
+        );
 
+        expectSendRecord(emptyHeaders());
         expectTopicCreation(TOPIC);
 
-        PowerMock.replayAll();
-
         workerTask.toSend = records;
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
+
         assertEquals(SERIALIZED_KEY, sent.getValue().key());
         assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
-        PowerMock.verifyAll();
+        verifyTaskGetTopic();
     }
 
     @Test
     public void testSendRecordsPropagatesTimestamp() {
         final Long timestamp = System.currentTimeMillis();
-
         createWorkerTask();
 
-        List<SourceRecord> records = Collections.singletonList(
-                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
-        );
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
-
+        expectSendRecord(emptyHeaders());
         expectTopicCreation(TOPIC);
 
-        PowerMock.replayAll();
-
-        workerTask.toSend = records;
+        workerTask.toSend = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
         assertEquals(timestamp, sent.getValue().timestamp());
 
-        PowerMock.verifyAll();
+        verifyTaskGetTopic();
     }
 
     @Test
     public void testSendRecordsCorruptTimestamp() {
         final Long timestamp = -3L;
         createWorkerTask();
 
-        List<SourceRecord> records = Collections.singletonList(
+        expectSendRecord(emptyHeaders());
+        expectTopicCreation(TOPIC);

Review Comment:
   Good catch. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1098135416


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest {
     @Mock private ConnectorOffsetBackingStore offsetStore;
     @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerSourceTaskContext sourceTaskContext;
-    @MockStrict private TaskStatus.Listener statusListener;

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1434672313

   Thanks for the thorough review @C0urante! 
   
   I'll get to it right away.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1420146327

   > Thanks @hgeraldino!
   > 
   > I'm a little confused by the amount of churn here--it seems like a lot of utility methods related to topic creation, record transformation, etc. have been removed and their contents inlined directly into test cases. If this isn't necessary for the migration, can we try to retain that approach in order to reduce duplication?
   > 
   > It's also worth noting that there are unused stubbings in some of these tests, which should be failing the build but are not at the moment due to [KAFKA-14682](https://issues.apache.org/jira/browse/KAFKA-14682). You can find these unused stubbings by running `./gradlew :connect:runtime:test --tests AbstractWorkerSourceTaskTest` in your command line, or possibly by running the entire `AbstractWorkerSourceTaskTest` test suite in IntelliJ (which is how I discovered them). These unused stubbings should be removed before we merge the PR.
   
   That's fair. Personally I prefer self-contained test methods hat can be read top-bottom (without having to jump around), even if it violates the DRY principle. But I'm ok on keeping the existing structure & style, so I put back the (revised) helper methods


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1422306889

   Oh, sorry, I don't know where I was looking when I read your response. Apologies for the back-and-forth.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1420949503

   @clolov I've provided some information on that front in the description for [the ticket I linked](https://issues.apache.org/jira/browse/KAFKA-14682). TL;DR: I believe it's because we use `./gradlew unitTest integrationTest` instead of `./gradlew test` in our Jenkins build, but I'm not certain of that yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112456303


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());

Review Comment:
   Makes sense. Moved the `createOrFindTopics` verification to a separate method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112464114


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -485,32 +479,45 @@ public void testSendRecordsTopicDescribeRetriesMidway() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First round
-        expectPreliminaryCalls(OTHER_TOPIC);
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
-
-        // First call to describe the topic times out
-        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
+        expectPreliminaryCalls();
 
-        // Second round
-        expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
+        when(admin.describeTopics(anyString())).thenAnswer(new Answer<Map<String, TopicDescription>>() {
+            int counter = 0;
 
-        PowerMock.replayAll();
+            @Override
+            public Map<String, TopicDescription> answer(InvocationOnMock invocation) {
+                counter++;
+                if (counter == 2) {
+                    throw new RetriableException(new TimeoutException("timeout"));
+                }
 
-        // Try to send 3, make first pass, second fail. Should save last two
+                return Collections.emptyMap();
+            }
+        });

Review Comment:
   It certainly is more readable. 
   
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1116427030


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -523,34 +513,26 @@ public void testSendRecordsTopicCreateRetriesMidway() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First round
+        expectPreliminaryCalls(TOPIC);
         expectPreliminaryCalls(OTHER_TOPIC);
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
-
-        EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
-        // First call to create the topic times out
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
 
-        // Second round
-        expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(anyString())).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+                .thenReturn(createdTopic(TOPIC))
+                .thenThrow(new RetriableException(new TimeoutException("timeout")))
+                .thenReturn(createdTopic(OTHER_TOPIC));
 
         // Try to send 3, make first pass, second fail. Should save last two

Review Comment:
   Nit: This comment was never accurate (not on the current trunk, nor in this PR); we actually just save the last one. Can we update it to "Should save last record" to match the other comment of its kind?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });
 
         workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   Nicely done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1107510674


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -235,115 +236,100 @@ public void testMetricsGroup() {
     public void testSendRecordsConvertsData() {
         createWorkerTask();
 
-        List<SourceRecord> records = new ArrayList<>();
         // Can just use the same record for key and value
-        records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
+        List<SourceRecord> records = Collections.singletonList(
+            new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
+        );
 
+        expectSendRecord(emptyHeaders());
         expectTopicCreation(TOPIC);
 
-        PowerMock.replayAll();
-
         workerTask.toSend = records;
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
+
         assertEquals(SERIALIZED_KEY, sent.getValue().key());
         assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
-        PowerMock.verifyAll();
+        verifyTaskGetTopic();
     }
 
     @Test
     public void testSendRecordsPropagatesTimestamp() {
         final Long timestamp = System.currentTimeMillis();
-
         createWorkerTask();
 
-        List<SourceRecord> records = Collections.singletonList(
-                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
-        );
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();
-
+        expectSendRecord(emptyHeaders());
         expectTopicCreation(TOPIC);
 
-        PowerMock.replayAll();
-
-        workerTask.toSend = records;
+        workerTask.toSend = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord();
         assertEquals(timestamp, sent.getValue().timestamp());
 
-        PowerMock.verifyAll();
+        verifyTaskGetTopic();
     }
 
     @Test
     public void testSendRecordsCorruptTimestamp() {
         final Long timestamp = -3L;
         createWorkerTask();
 
-        List<SourceRecord> records = Collections.singletonList(
+        expectSendRecord(emptyHeaders());
+        expectTopicCreation(TOPIC);

Review Comment:
   Why is this added? We're testing a scenario where the task fails on an invalid record timestamp, it should never get to the point of attempting to create a topic.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());

Review Comment:
   The old `expectTaskGetTopic` method is related to recording topic usage in the status backing store, not topic creation.
   
   This part should be pulled out into a separate method since the two are not necessarily related to each other.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });

Review Comment:
   There's an easier way to set up consecutive expectations in Mockito:
   ```suggestion
           when(admin.createOrFindTopics(any(NewTopic.class)))
                   // First call to create the topic times out
                   .thenThrow(new RetriableException(new TimeoutException("timeout")))
                   // Next attempt succeeds
                   .thenReturn(createdTopic(TOPIC));
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });
 
         workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   Two notes that I can't leave directly inline due to GitHub UI limitations:
   
   1. After line 466 (`assertEquals(Arrays.asList(record1, record2), workerTask.toSend);`), we should add a single call to `verifyTopicCreation()`
   2. After line 470 (`assertNull(workerTask.toSend);`), we should add a follow-up call to `verifyTopicCreation(times(2));`
   
   This assumes that `verifyTopicCreation` is broken up into two overloaded variants:
   ```java
       private void verifyTopicCreation() {
           verifyTopicCreation(times(1));
       }
   
       private void verifyTopicCreation(VerificationMode mode) {
           ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class);
           verify(admin, mode).createOrFindTopics(newTopicCapture.capture());
           assertEquals(TOPIC, newTopicCapture.getValue().name());
       }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());
     }
 
+    @SuppressWarnings("SameParameterValue")
     private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }
+        when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(topic));
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
         Set<String> created = Collections.singleton(topic);
         Set<String> existing = Collections.emptySet();
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
         Set<String> created = Collections.emptySet();
         Set<String> existing = Collections.singleton(topic);
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
     private void expectPreliminaryCalls() {
-        expectPreliminaryCalls(TOPIC);
-    }
+        expectConvertHeadersAndKeyValue(emptyHeaders());
+        expectApplyTransformationChain();
+    }
+
+    private void expectConvertHeadersAndKeyValue(Headers headers) {

Review Comment:
   This assumes that we'll always handle records whose topic is `TOPIC`, but that assumption doesn't hold in the `testSendRecordsTopicDescribeRetriesMidway` and `testSendRecordsTopicCreateRetriesMidway` cases, where we try to send records with the topic `OTHER_TOPIC`.
   
   Even though the tests are passing right now, it's important that we add expectations for converting records for the other topic since the strict stubbing we've configured Mockito with will fail the test if those expectations are not utilized, which effectively increases the coverage in these tests.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -485,32 +479,45 @@ public void testSendRecordsTopicDescribeRetriesMidway() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First round
-        expectPreliminaryCalls(OTHER_TOPIC);
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
-
-        // First call to describe the topic times out
-        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
+        expectPreliminaryCalls();
 
-        // Second round
-        expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
+        when(admin.describeTopics(anyString())).thenAnswer(new Answer<Map<String, TopicDescription>>() {
+            int counter = 0;
 
-        PowerMock.replayAll();
+            @Override
+            public Map<String, TopicDescription> answer(InvocationOnMock invocation) {
+                counter++;
+                if (counter == 2) {
+                    throw new RetriableException(new TimeoutException("timeout"));
+                }
 
-        // Try to send 3, make first pass, second fail. Should save last two
+                return Collections.emptyMap();
+            }
+        });

Review Comment:
   This can be simplified:
   
   ```java
           expectTopicCreation(TOPIC);
           when(admin.describeTopics(eq(OTHER_TOPIC)))
                   .thenThrow(new RetriableException(new TimeoutException("timeout")))
                   .thenReturn(Collections.emptyMap());
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -425,17 +412,21 @@ public void testSendRecordsTopicDescribeRetries() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
         expectPreliminaryCalls();
-        // First round - call to describe the topic times out
-        EasyMock.expect(admin.describeTopics(TOPIC))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
-
-        // Second round - calls to describe and create succeed
         expectTopicCreation(TOPIC);

Review Comment:
   We shouldn't be establishing this expectation before invoking `workerTask::sendRecords` the first time, since we don't actually expect topic creation to take place then.
   
   I think this could be better structured by replacing lines 415 (`expectTopicCreation(TOPIC)`) through to the end of the test case with this:
   
   ```java
           // First round - call to describe the topic times out
           when(admin.describeTopics(TOPIC)).thenThrow(new RetriableException(new TimeoutException("timeout")));
   
           workerTask.toSend = Arrays.asList(record1, record2);
           workerTask.sendRecords();
           assertEquals(Arrays.asList(record1, record2), workerTask.toSend);
           verifyNoMoreInteractions(admin);
   
           reset(admin);
           // Second round - calls to describe and create succeed
           expectTopicCreation(TOPIC);
   
           workerTask.sendRecords();
           assertNull(workerTask.toSend);
           verifyTopicCreation();
   ```
   
   (assuming a `verifyTopicCreation()` method is added)



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -404,17 +391,17 @@ public void testTopicCreateWhenTopicExists() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
         expectPreliminaryCalls();
+
         TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
         TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc));
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc));
 
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        expectSendRecord(emptyHeaders());
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        verifySendRecord(2);

Review Comment:
   We need to make sure that, under these circumstances, we never tried to create a topic. One way to accomplish this is to verify that we never used the `TopicAdmin` for anything except the call to `describeTopics`:
   ```suggestion
           verifySendRecord(2);
           // Make sure we didn't try to create the topic after finding out it already existed
           verifyNoMoreInteractions(admin);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112485394


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });
 
         workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   This one is a little bit trickier, as we cannot do partial verification without resetting the mock. 
   
   What I ended up doing was checking that calls to `createOrFindTopics` happen twice midway, and verify the arguments, then check once again at the end of the test - this last verification is a cumulative of all 3 calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1098136639


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,93 +814,25 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(transformationChain.apply(any(SourceRecord.class)))
+            .thenAnswer((Answer<SourceRecord>) invocation -> invocation.getArgument(0));
+        when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA),
+            anyString()))
+            .thenAnswer((Answer<byte[]>) invocation -> {
+                String headerValue = invocation.getArgument(3, String.class);
+                return headerValue.getBytes(StandardCharsets.UTF_8);
+            });
+        when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(KEY_SCHEMA), eq(KEY)))
+            .thenReturn(SERIALIZED_KEY);
+        when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), eq(RECORD_SCHEMA),
+            eq(RECORD)))
+            .thenReturn(SERIALIZED_RECORD);
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
-        if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
-
-        expectApplyTransformationChain(anyTimes);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks)));
-
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
-
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
-
-        expectTaskGetTopic(anyTimes);
-
-        return sent;
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
-        return expectSendRecord(TOPIC, true, emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
-
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
-    }
-
-    private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }

Review Comment:
   Fair enough, I restored some of these methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13191:
URL: https://github.com/apache/kafka/pull/13191


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

Posted by "hgeraldino (via GitHub)" <gi...@apache.org>.
hgeraldino commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1445691840

   > 
   
   Added  verifications for the getTopic and topicCreation expectations in the few places where it was missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org