You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/04/13 08:32:52 UTC

[kafka] branch 3.2 updated: KAFKA-10405: Set purge interval explicitly in PurgeRepartitionTopicIntegrationTest (#11948)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 821275e6b3 KAFKA-10405: Set purge interval explicitly in PurgeRepartitionTopicIntegrationTest (#11948)
821275e6b3 is described below

commit 821275e6b3af146de8bab65107985c65c863f09f
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Sat Mar 26 00:30:02 2022 +0800

    KAFKA-10405: Set purge interval explicitly in PurgeRepartitionTopicIntegrationTest (#11948)
    
    In KIP-811, we added a new config repartition.purge.interval.ms to set repartition purge interval. In this flaky test, we expected the purge interval is the same as commit interval, which is not correct anymore (default is 30 sec). Set the purge interval explicitly to fix this issue.
    
    Reviewers: Bruno Cadonna <ca...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/integration/PurgeRepartitionTopicIntegrationTest.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index ffb35312ea..37d8743521 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -161,6 +161,7 @@ public class PurgeRepartitionTopicIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, PURGE_INTERVAL_MS);
+        streamsConfiguration.put(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, PURGE_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
@@ -203,10 +204,11 @@ public class PurgeRepartitionTopicIntegrationTest {
         TestUtils.waitForCondition(new RepartitionTopicCreatedWithExpectedConfigs(), 60000,
                 "Repartition topic " + REPARTITION_TOPIC + " not created with the expected configs after 60000 ms.");
 
+        // wait until we received more than 1 segment of data, so that we can confirm the purge succeeds in next verification
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize > 0),
+            new RepartitionTopicVerified(currentSize -> currentSize > PURGE_SEGMENT_BYTES),
             60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not received data after 60000 ms."
+            "Repartition topic " + REPARTITION_TOPIC + " not received more than " + PURGE_SEGMENT_BYTES + "B of data after 60000 ms."
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side