You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/19 08:57:47 UTC

[GitHub] [kafka] showuon opened a new pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

showuon opened a new pull request #11413:
URL: https://github.com/apache/kafka/pull/11413


   In https://github.com/apache/kafka/pull/9642, we removed the unnecessary `success` parameter, and use the `error` as the key to identify if the commit successfully or failed. However, there are some cases we passed `success` with `false`, but without `error` value. I think we should always pass the `error` value when failed. Fix it and add tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r734370453



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -550,16 +552,16 @@ public boolean commitOffsets() {
         // Now we can actually flush the offsets to user storage.
         Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
             if (error != null) {
+                // Very rare case: offsets were unserializable, and unable to store any data
                 log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
+                finishFailedFlush();
+                recordCommitFailure(time.milliseconds() - started, error);

Review comment:
       `doFlush` will return `null` after calling the callback with error attached. Handle the failed flush here, since we can know which error is thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-970395759


   @showuon, thanks for trying to fix this issue. But I think the best course of action here is actually to revert #9642, for a few reasons.
   1. It was actually useful to have that boolean `success` parameter within the protected `TaskMetricsGroup.recordCommit(...)` helper method, since it allowed the `WorkerTask.recordCommitSuccess(...)` and `recordCommitFailure(...)` methods to call that one method with desired behavior.
   2. Rolling back is also more encapsulated and significantly easier to backport all the way back to the `2.8` branch, especially since we've significantly refactored the source connector offset commit logic only in `3.0` and later branches.
   
   So I think I'm going to revert #9642, but your additional unit tests here are also very useful. Would you mind creating a new PR with those unit test improvements? Thanks!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731647369



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -550,16 +552,16 @@ public boolean commitOffsets() {
         // Now we can actually flush the offsets to user storage.
         Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
             if (error != null) {
+                // Very rare case: offsets were unserializable and we finished immediately, unable to store
+                // any data
                 log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
+                finishFailedFlush();
+                recordCommitFailure(time.milliseconds() - started, error);
             } else {

Review comment:
       `doFlush` will return `null` after calling the callback with `error` attached. Handle the failed flush here, since we can know which error is thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-961861164


   @UnityLung @chia7712  @rhauch , please help review this PR. thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch closed pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
rhauch closed pull request #11413:
URL: https://github.com/apache/kafka/pull/11413


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731649801



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -1580,9 +1652,22 @@ private void assertPollMetrics(int minimumPollCountExpected) {
         assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
         double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
         double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
-        assertEquals(0, activeCount, 0.000001d);
-        if (minimumPollCountExpected > 0) {
-            assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+
+        if (isWriteCompleted) {
+            assertEquals(0, activeCount, 0.000001d);
+            if (minimumPollCountExpected > 0) {
+                assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+            }
+        }
+
+        double failurePercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-failure-percentage");
+        double successPercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-success-percentage");
+
+        if (!isCommitSucceeded) {
+            assertTrue(failurePercentage > 0);
+        } else {
+            assertTrue(failurePercentage == 0);
+            assertTrue(successPercentage > 0);

Review comment:
       Add `offset-commit-failure-percentage` and `"offset-commit-success-percentage` metrics verification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-946513668


   @chia7712 , could you please take a look? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731648129



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -129,6 +129,7 @@
     // is used in the right place.
     private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
     private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
+    private static final long STOP_TIME_OUT = 10000;

Review comment:
       Sometimes the worker task stops need more than 1 sec. Increasing the timeout to 10 secs to make it reliable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-952499102


   @UnityLung @chia7712 @rhauch , please help review this PR when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731649801



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -1580,9 +1652,22 @@ private void assertPollMetrics(int minimumPollCountExpected) {
         assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
         double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
         double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
-        assertEquals(0, activeCount, 0.000001d);
-        if (minimumPollCountExpected > 0) {
-            assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+
+        if (isWriteCompleted) {
+            assertEquals(0, activeCount, 0.000001d);
+            if (minimumPollCountExpected > 0) {
+                assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+            }
+        }
+
+        double failurePercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-failure-percentage");
+        double successPercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-success-percentage");
+
+        if (!isCommitSucceeded) {
+            assertTrue(failurePercentage > 0);
+        } else {
+            assertTrue(failurePercentage == 0);
+            assertTrue(successPercentage > 0);

Review comment:
       Add `offset-commit-failure-percentage` and `"offset-commit-success-percentage` metrics verification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731647369



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -550,16 +552,16 @@ public boolean commitOffsets() {
         // Now we can actually flush the offsets to user storage.
         Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
             if (error != null) {
+                // Very rare case: offsets were unserializable and we finished immediately, unable to store
+                // any data
                 log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
+                finishFailedFlush();
+                recordCommitFailure(time.milliseconds() - started, error);
             } else {

Review comment:
       `doFlush` will return `null` after calling the callback with `error` attached. Handle the failed flush here, since we can know which error is thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-961861164


   @UnityLung @chia7712  @rhauch , please help review this PR. thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#issuecomment-961861164


   @UnityLung @chia7712  @rhauch , please help review this PR. thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r734369997



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -1580,9 +1652,23 @@ private void assertPollMetrics(int minimumPollCountExpected) {
         assertTrue(Double.isNaN(pollBatchTimeAvg) || pollBatchTimeAvg > 0.0d);
         double activeCount = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count");
         double activeCountMax = metrics.currentMetricValueAsDouble(sourceTaskGroup, "source-record-active-count-max");
-        assertEquals(0, activeCount, 0.000001d);
-        if (minimumPollCountExpected > 0) {
-            assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+
+        if (isWriteCompleted) {
+            assertEquals(0, activeCount, 0.000001d);
+            if (minimumPollCountExpected > 0) {
+                assertEquals(RECORDS.size(), activeCountMax, 0.000001d);
+            }
+        }
+
+        double failurePercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-failure-percentage");
+        double successPercentage = metrics.currentMetricValueAsDouble(taskGroup, "offset-commit-success-percentage");
+
+        if (!isCommitSucceeded) {
+            assertTrue(failurePercentage > 0);
+            assertTrue(successPercentage == 0);
+        } else {
+            assertTrue(failurePercentage == 0);
+            assertTrue(successPercentage > 0);

Review comment:
       Add `offset-commit-failure-percentage` and `offset-commit-success-percentage` metrics verification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11413: KAFKA-13370: add errors when commit offsets failed and add tests

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11413:
URL: https://github.com/apache/kafka/pull/11413#discussion_r731645529



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -514,9 +514,11 @@ public boolean commitOffsets() {
                     // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain,
                     // we can stop flushing immediately
                     if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size());
+                        log.error("{} Failed to flush, task is cancelled, or timed out while waiting for producer " +
+                            "to flush outstanding {} messages", this, outstandingMessages.size());

Review comment:
       side fix: the error could be timed out or "cancelled". Add in the log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org