You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/21 22:56:42 UTC

[GitHub] [kafka] C0urante opened a new pull request, #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

C0urante opened a new pull request, #12429:
URL: https://github.com/apache/kafka/pull/12429

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14089)
   
   This is a potential fix for the flakiness in the `ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic` test. There's also a few minor improvements to prevent unnecessary `ERROR`-level log messages for shutdown of cancelled exactly-once source tasks, and unnecessary `WARN`-level log messages when creating exactly-once source task producers.
   
   This fix should make the test resilient to unclean task and worker shutdown by (as the title indicates) only verifying data emitted by the source tasks up to the latest-committed offset; data after that point may exist in the topic written to by the task, but does not have to be accurate in order to retain the at-least-once delivery guarantees provided when exactly-once support is disabled.
   
   Full disclosure: I haven't encountered any failures while running this locally, but I also haven't been able to replicate the failures described in the ticket, either.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
mimaison commented on PR #12429:
URL: https://github.com/apache/kafka/pull/12429#issuecomment-1198315774

   I've backported this to 3.3: https://github.com/apache/kafka/commit/dc0866557c5051b8711df67d68621a3306521b10


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381705


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -826,15 +830,22 @@ public void testSeparateOffsetsTopic() throws Exception {
             assertConnectorStopped(connectorStop);
 
             // consume all records from the source topic or fail, to ensure that they were correctly produced
-            ConsumerRecords<byte[], byte[]> records = connectorTargetedCluster.consumeAll(
+            ConsumerRecords<byte[], byte[]> sourceRecords = connectorTargetedCluster.consumeAll(
                     CONSUME_RECORDS_TIMEOUT_MS,
                     Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
                     null,
                     topic
             );
-            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
-                    records.count() >= recordsProduced);
-            assertExactlyOnceSeqnos(records, numTasks);
+            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+                    sourceRecords.count() >= recordsProduced);
+            // also have to check which offsets have actually been committed, since we no longer have exactly-once guarantees

Review Comment:
   Good point; added a Javadoc.
   
   I'll note that, while writing it, it's occurred to me that we could strengthen some of the guarantees in the test. For example, we don't assert that the connector has actually committed any new offsets after bringing up the cluster again with exactly-once support disabled. But I'm a little reluctant to address these immediately since it may introduce additional flakiness if we get things wrong, and this PR is likely going to be backported to 3.3 in order to improve stability while generating release candidates. If you believe these additional guarantees are worth pursuing, though, let me know and I can file a separate follow-up knowing that it's targeted exclusively at trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12429:
URL: https://github.com/apache/kafka/pull/12429#issuecomment-1198319088

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison merged pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
mimaison merged PR #12429:
URL: https://github.com/apache/kafka/pull/12429


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381599


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);

Review Comment:
   Ahh, good catch--the "all" is incorrect in that comment. I've updated it from "consume all records" to "consume at least the expected number of records".
   
   On a separate note, I believe a similar mixup is the cause of https://issues.apache.org/jira/browse/KAFKA-14101, where we _should_ be using `consumeAll` but aren't at the moment. I've pushed a commit that should fix that.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just need to read one offset record
-            ConsumerRecord<byte[], byte[]> offsetRecord = connectorTargetedCluster
-                    .consume(
-                            1,
+            ConsumerRecords<byte[], byte[]> offsetRecords = connectorTargetedCluster
+                    .consumeAll(
                             TimeUnit.MINUTES.toMillis(1),
                             Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+                            null,
                             offsetsTopic
-                    ).iterator().next();
-            long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-            assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
-                    0, seqno % recordsProduced);
+                    );
+            List<Long> seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+            seqnos.forEach(seqno ->
+                assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+                        0, seqno % recordsProduced)
+            );
 
             // also consume from the cluster's global offsets topic; again, just need to read one offset record

Review Comment:
   Right again, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r929381491


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -217,6 +217,9 @@ protected void finalOffsetCommit(boolean failed) {
         if (failed) {
             log.debug("Skipping final offset commit as task has failed");
             return;
+        } else if (isCancelled()) {
+            log.debug("Skipping final offset commit as task has been cancelled and its producer has already been closed");

Review Comment:
   That's fair; technically, it may not be closed if we hit this part right after `cancelled` is flipped to `true` but not before the producer is actually closed. I'd personally err on the side of skipping the offset commit if the task is cancelled since it's scary to see offset commit failure messages and the odds of an offset commit being viable once we know the task has been cancelled are low, but I'm not too attached to this (especially on a flaky test PR). What are your thoughts?
   
   FWIW, I've also updated the log message to remove the explanation of why we're skipping the commit; the detail about the producer being closed doesn't seem strictly necessary.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just need to read one offset record

Review Comment:
   Nope, thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
mimaison commented on PR #12429:
URL: https://github.com/apache/kafka/pull/12429#issuecomment-1195440012

   @tombentley Do you want to take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
mimaison commented on PR #12429:
URL: https://github.com/apache/kafka/pull/12429#issuecomment-1195330919

   I've run the tests a dozen times and I've not seen any failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tombentley commented on a diff in pull request #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

Posted by GitBox <gi...@apache.org>.
tombentley commented on code in PR #12429:
URL: https://github.com/apache/kafka/pull/12429#discussion_r928773599


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -217,6 +217,9 @@ protected void finalOffsetCommit(boolean failed) {
         if (failed) {
             log.debug("Skipping final offset commit as task has failed");
             return;
+        } else if (isCancelled()) {
+            log.debug("Skipping final offset commit as task has been cancelled and its producer has already been closed");

Review Comment:
   Is it necessarily true that the producer is already closed, given that the closure on cancellation is actually delegated to an executor?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just need to read one offset record

Review Comment:
   Is the "just need to read one offset record" part of the comment still correct?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);
 
             // also consume from the connector's dedicated offsets topic; just need to read one offset record
-            ConsumerRecord<byte[], byte[]> offsetRecord = connectorTargetedCluster
-                    .consume(
-                            1,
+            ConsumerRecords<byte[], byte[]> offsetRecords = connectorTargetedCluster
+                    .consumeAll(
                             TimeUnit.MINUTES.toMillis(1),
                             Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+                            null,
                             offsetsTopic
-                    ).iterator().next();
-            long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-            assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
-                    0, seqno % recordsProduced);
+                    );
+            List<Long> seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+            seqnos.forEach(seqno ->
+                assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+                        0, seqno % recordsProduced)
+            );
 
             // also consume from the cluster's global offsets topic; again, just need to read one offset record

Review Comment:
   "just need to read one offset record" again seems suspect, given the change from `consume` to `consumeAll` below.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -826,15 +830,22 @@ public void testSeparateOffsetsTopic() throws Exception {
             assertConnectorStopped(connectorStop);
 
             // consume all records from the source topic or fail, to ensure that they were correctly produced
-            ConsumerRecords<byte[], byte[]> records = connectorTargetedCluster.consumeAll(
+            ConsumerRecords<byte[], byte[]> sourceRecords = connectorTargetedCluster.consumeAll(
                     CONSUME_RECORDS_TIMEOUT_MS,
                     Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
                     null,
                     topic
             );
-            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
-                    records.count() >= recordsProduced);
-            assertExactlyOnceSeqnos(records, numTasks);
+            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+                    sourceRecords.count() >= recordsProduced);
+            // also have to check which offsets have actually been committed, since we no longer have exactly-once guarantees

Review Comment:
   I don't quite follow the "no longer have EOS guarantees" part. And that made me notice that this test method (unlike the others in the class) doesn't have a javadoc description about what exactly is being tested here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -768,27 +767,32 @@ public void testSeparateOffsetsTopic() throws Exception {
                     recordNum >= recordsProduced);

Review Comment:
   I know this is out of the scope of this fix, but I noticed that the `recordNum >= recordsProduced` seems to conflict with the `// consume all` part of the comment. Or perhaps the call to `consume` on line 760 should be `consumeAll` too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org