You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/25 11:45:08 UTC

[GitHub] [kafka] tombentley commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

tombentley commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r928773599


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -217,6 +217,9 @@ protected void finalOffsetCommit(boolean failed) {
         if (failed) {
             log.debug("Skipping final offset commit as task has failed");
             return;
+        } else if (isCancelled()) {
+            log.debug("Skipping final offset commit as task has been cancelled and its producer has already been closed");

Review Comment:
   Is it necessarily true that the producer is already closed, given that the closure on cancellation is actually delegated to an executor?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just need to read one offset record

Review Comment:
   Is the "just need to read one offset record" part of the comment still correct?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just need to read one offset record
-            ConsumerRecord<byte[], byte[]> offsetRecord = connectorTargetedCluster
-                    .consume(
-                            1,
+            ConsumerRecords<byte[], byte[]> offsetRecords = connectorTargetedCluster
+                    .consumeAll(
                             TimeUnit.MINUTES.toMillis(1),
                             Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+                            null,
                             offsetsTopic
-                    ).iterator().next();
-            long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-            assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
-                    0, seqno % recordsProduced);
+                    );
+            List<Long> seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+            seqnos.forEach(seqno ->
+                assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+                        0, seqno % recordsProduced)
+            );
 
             // also consume from the cluster's global offsets topic; again, just need to read one offset record

Review Comment:
   "just need to read one offset record" again seems suspect, given the change from `consume` to `consumeAll` below.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -826,15 +830,22 @@ public void testSeparateOffsetsTopic() throws Exception {
             assertConnectorStopped(connectorStop);
 
             // consume all records from the source topic or fail, to ensure that they were correctly produced
-            ConsumerRecords<byte[], byte[]> records = connectorTargetedCluster.consumeAll(
+            ConsumerRecords<byte[], byte[]> sourceRecords = connectorTargetedCluster.consumeAll(
                     CONSUME_RECORDS_TIMEOUT_MS,
                     Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
                     null,
                     topic
             );
-            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
-                    records.count() >= recordsProduced);
-            assertExactlyOnceSeqnos(records, numTasks);
+            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+                    sourceRecords.count() >= recordsProduced);
+            // also have to check which offsets have actually been committed, since we no longer have exactly-once guarantees

Review Comment:
   I don't quite follow the "no longer have EOS guarantees" part. And that made me notice that this test method (unlike the others in the class) doesn't have a javadoc description about what exactly is being tested here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);

Review Comment:
   I know this is out of the scope of this fix, but I noticed that the `recordNum >= recordsProduced` seems to conflict with the `// consume all` part of the comment. Or perhaps the call to `consume` on line 760 should be `consumeAll` too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org