You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/15 05:11:43 UTC

[hudi] branch master updated: [HUDI-4837] Stop sleeping where it is not necessary after the success (#6270)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d03e9a1b [HUDI-4837] Stop sleeping where it is not necessary after the success (#6270)
35d03e9a1b is described below

commit 35d03e9a1bede05d10f10c6e4b57ffe66ca7f330
Author: Volodymyr Burenin <vb...@gmail.com>
AuthorDate: Thu Sep 15 00:11:34 2022 -0500

    [HUDI-4837] Stop sleeping where it is not necessary after the success (#6270)
    
    Co-authored-by: Volodymyr Burenin <vo...@cloudkitchens.com>
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java     | 4 +++-
 .../java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java   | 4 ++--
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 1e78610ced..81f06a0f9f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -315,7 +315,9 @@ public class KafkaOffsetGen {
       // TODO(HUDI-4625) cleanup, introduce retrying client
       partitionInfos = consumer.partitionsFor(topicName);
       try {
-        TimeUnit.SECONDS.sleep(10);
+        if (partitionInfos == null) {
+          TimeUnit.SECONDS.sleep(10);
+        }
       } catch (InterruptedException e) {
         LOG.error("Sleep failed while fetching partitions");
       }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 2b99f19b27..1147736143 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -249,7 +249,7 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
     // create a topic with very short retention
     final String topic = TEST_TOPIC_PREFIX + "testFailOnDataLoss";
     Properties topicConfig = new Properties();
-    topicConfig.setProperty("retention.ms", "10000");
+    topicConfig.setProperty("retention.ms", "8000");
     testUtils.createTopic(topic, 1, topicConfig);
 
     TypedProperties failOnDataLossProps = createPropsForKafkaSource(topic, null, "earliest");
@@ -261,7 +261,7 @@ abstract class BaseTestKafkaSource extends SparkClientFunctionalTestHarness {
     assertEquals(2, fetch1.getBatch().get().count());
 
     // wait for the checkpoint to expire
-    Thread.sleep(10001);
+    Thread.sleep(30000);
     Throwable t = assertThrows(HoodieDeltaStreamerException.class, () -> {
       kafkaSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
     });