You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2021/08/18 17:06:49 UTC
[kafka] branch 3.0 updated: KAFKA-13010: Retry getting tasks incase
of rebalance for TaskMetadata tests (#11083)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0999393 KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests (#11083)
0999393 is described below
commit 09993933cb9c9291a916d59ec3c6d9544ad1527a
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Tue Jul 20 16:56:07 2021 -0500
KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests (#11083)
If there is a cooperative rebalance the tasks might not be assigned to a thread at all for a very short timeframe, causing this test to fail. We can just retry getting the metadata until the group has finished rebalancing and all tasks are assigned
Reviewers: Bruno Cadonna <ca...@apache.org>, Anna Sophie Blee-Goldman <ab...@apache.org>, Josep Prat <jo...@aiven.io>
---
.../streams/integration/TaskMetadataIntegrationTest.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 8639242..6f35d12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -158,10 +159,13 @@ public class TaskMetadataIntegrationTest {
}
}
- private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
- final List<TaskMetadata> taskMetadataList = kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList());
- assertThat("only one task", taskMetadataList.size() == 1);
- return taskMetadataList.get(0);
+ private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) throws InterruptedException {
+ final AtomicReference<List<TaskMetadata>> taskMetadataList = new AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+ taskMetadataList.set(kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> t.activeTasks().stream()).collect(Collectors.toList()));
+ return taskMetadataList.get().size() == 1;
+ }, "The number of active tasks returned in the allotted time was not one.");
+ return taskMetadataList.get().get(0);
}
@After