You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/19 23:08:10 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #11083: KAFKA-13010: retry for tasks

wcarlson5 opened a new pull request #11083:
URL: https://github.com/apache/kafka/pull/11083


   If there is a cooperative the tasks might not be assigned to a thread so retrying should work in a very short timeframe
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r673154289



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,10 +159,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
-        final List<TaskMetadata> taskMetadataList = kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
-        assertThat("only one task", taskMetadataList.size() == 1);
-        return taskMetadataList.get(0);
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        AtomicReference<List<TaskMetadata>> taskMetadataList = new AtomicReference<>();
+        TestUtils.waitForCondition( () -> {
+            taskMetadataList.set(kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList()));
+            return taskMetadataList.get().size() == 1;
+        }, "The active tasks returned was not one in the allotted time.");

Review comment:
       ```suggestion
           }, "The number of active tasks returned in the allotted time was not one.");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-904782977


   @kkonstantine I saw you made a ticket for a failing test. I think this PR should fix it for 3.0 as well if cherrypicked


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r673163836



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,10 +159,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
-        final List<TaskMetadata> taskMetadataList = kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
-        assertThat("only one task", taskMetadataList.size() == 1);
-        return taskMetadataList.get(0);
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        AtomicReference<List<TaskMetadata>> taskMetadataList = new AtomicReference<>();

Review comment:
       This variable needs to be `final` says checkstyle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672999766



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams
+                .metadataForLocalThreads()
+                .stream()
+                .mapToLong(t -> t.activeTasks().size())
+                .sum() == 1, "only one task");

Review comment:
       Could you please add a more descriptive failure message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672944792



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       Correct me if I'm wrong, I just want to understand better the case you are raising. You mean to keep the `TaskMetadata` used for this test and then compare them with the `activeTasks`, right?
   But I don't really understand how this will prevent from discrepancies if the task has been revoked in between calls. Wouldn't it be then also not present in `activeTasks`?
   
   Sorry to hijack this for this question and thanks in advance!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r673136295



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       @ableegoldman good point. I was trying to avoid referencing a variable inside a lambda but I can just use an atomic reference




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: retry for tasks

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-882919484


   @ableegoldman @jlprat @cadonna Can I get a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672999766



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams
+                .metadataForLocalThreads()
+                .stream()
+                .mapToLong(t -> t.activeTasks().size())
+                .sum() == 1, "only one task");

Review comment:
       Could you please add a more descriptive failure message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672944792



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       Correct me if I'm wrong, I just want to understand better the case you are raising. You mean to keep the `TaskMetadata` used for this test and then compare them with the `activeTasks`, right?
   But I don't really understand how this will prevent from discrepancies if the task has been revoked in between calls. Wouldn't it be then also not present in `activeTasks`?
   
   Sorry to hijack this for this question and thanks in advance!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: retry for tasks

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-882919484


   @ableegoldman @jlprat @cadonna Can I get a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: retry for tasks

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-882919484


   @ableegoldman @jlprat @cadonna Can I get a review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11083:
URL: https://github.com/apache/kafka/pull/11083


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672762836



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       I'm sure the odds of this are super low, but to really make this airtight you'd need to have the `List<TaskMetadata>` that you use for the test be the same one that you test in this condition, otherwise it's _possible_ for the one task to have been revoked in the split second between `waitForCondition()` and the new call to `metadataForLocalThreads()`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r673144945



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       Got it, thanks! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672762836



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       I'm sure the odds of this are super low, but to really make this airtight you'd need to have the `List<TaskMetadata>` that you use for the test be the same one that you test in this condition, otherwise it's _possible_ for the one task to have been revoked in the split second between `waitForCondition()` and the new call to `metadataForLocalThreads()`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-883733279


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672762836



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       I'm sure the odds of this are super low, but to really make this airtight you'd need to have the `List<TaskMetadata>` that you use for the test be the same one that you test in this condition, otherwise it's _possible_ for the one task to have been revoked in the split second between `waitForCondition()` and the new call to `metadataForLocalThreads()`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jlprat commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
jlprat commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r672944792



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       Correct me if I'm wrong, I just want to understand better the case you are raising. You mean to keep the `TaskMetadata` used for this test and then compare them with the `activeTasks`, right?
   But I don't really understand how this will prevent from discrepancies if the task has been revoked in between calls. Wouldn't it be then also not present in `activeTasks`?
   
   Sorry to hijack this for this question and thanks in advance!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 edited a comment on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
wcarlson5 edited a comment on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-904782977


   @kkonstantine I saw you made a ticket for a failing test. I think this PR should fix it for 3.0 as well if cherrypicked.
   https://issues.apache.org/jira/browse/KAFKA-13215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#discussion_r673137315



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
##########
@@ -158,9 +158,13 @@ public void shouldReportCorrectEndOffsetInformation() {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+        TestUtils.waitForCondition( () -> kafkaStreams

Review comment:
       @jlprat We are actually getting the `TaskMetadata` for that active tasks here and then   making sure that metadata is as expected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org