You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/29 16:27:45 UTC

[camel] branch main updated: (chores) More Thread.sleep cleanups (#7935)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 27f6d2bd93b (chores) More Thread.sleep cleanups (#7935)
27f6d2bd93b is described below

commit 27f6d2bd93bf7aebce7ce5f79f1c0b7cc7ed9ed9
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Jun 29 18:27:40 2022 +0200

    (chores) More Thread.sleep cleanups (#7935)
    
    * (chores) camel-hazelcast: replaced Thread.sleep with Awaitility
    
    * (chores) camel-hl7: replaced Thread.sleep with delay EIP
    
    * (chores) camel-irc: renamed manual test
    
    * (chores) camel-jetty: removed calls to Thread.sleep
    
    * (chores) camel-kubernetes: replaced Thread.sleep with Awaitility
    
    * (chores) camel-leveldb: replaced Thread.sleep with Awaitility
---
 components/camel-hazelcast/pom.xml                 |  5 +++
 .../HazelcastReplicatedmapConsumerTest.java        |  7 ++--
 .../hl7/HL7MLLPCodecMessageFloodingTest.java       |  4 +-
 ...IrcChat.java => CodehausIrcChatManualTest.java} |  6 +--
 .../jetty/JettyContinuationDisabledTest.java       | 11 ++----
 .../JettyEndpointContinuationDisabledTest.java     | 11 ++----
 .../cluster/KubernetesClusterServiceTest.java      |  7 ++--
 .../cluster/TimedLeaderNotifierTest.java           | 44 ++++++++++++----------
 ...bernetesReplicationControllersProducerTest.java | 10 ++---
 components/camel-leveldb/pom.xml                   |  5 +++
 .../LevelDBAggregateDiscardOnTimeoutTest.java      |  5 ++-
 ...DBAggregateNotLostRemovedWhenConfirmedTest.java |  8 +++-
 .../leveldb/LevelDBAggregateNotLostTest.java       |  8 +++-
 13 files changed, 73 insertions(+), 58 deletions(-)

diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml
index d34dc9e8520..4bc56279eea 100644
--- a/components/camel-hazelcast/pom.xml
+++ b/components/camel-hazelcast/pom.xml
@@ -79,6 +79,11 @@
             <artifactId>camel-spring-main</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
index 00e2db40b84..d1de844a98a 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.CamelTestSupport;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -75,12 +76,12 @@ public class HazelcastReplicatedmapConsumerTest extends CamelTestSupport {
      * mail from talip (hazelcast) on 21.02.2011: MultiMap doesn't support eviction yet. We can and should add this feature.
      */
     @Test
-    public void testEvict() throws InterruptedException {
+    public void testEvict() {
         MockEndpoint out = getMockEndpoint("mock:evicted");
         out.expectedMessageCount(1);
         map.put("4711", "my-foo", 100, TimeUnit.MILLISECONDS);
-        Thread.sleep(150);
-        assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS);
+        Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).untilAsserted(
+                () -> assertMockEndpointsSatisfied());
     }
 
     @Test
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
index fabb6af27b0..2f9057712a7 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
@@ -53,8 +53,8 @@ public class HL7MLLPCodecMessageFloodingTest extends HL7TestSupport {
                     Message input = exchange.getIn().getBody(Message.class);
                     Message response = input.generateACK();
                     exchange.getMessage().setBody(response);
-                    Thread.sleep(50); // simulate some processing time
-                }).to("mock:result");
+                }).delay(50) // simulate some processing time
+                        .to("mock:result");
             }
         };
     }
diff --git a/components/camel-irc/src/test/java/org/apache/camel/component/irc/CodehausIrcChat.java b/components/camel-irc/src/test/java/org/apache/camel/component/irc/CodehausIrcChatManualTest.java
similarity index 97%
rename from components/camel-irc/src/test/java/org/apache/camel/component/irc/CodehausIrcChat.java
rename to components/camel-irc/src/test/java/org/apache/camel/component/irc/CodehausIrcChatManualTest.java
index f2ce7dd7f88..21ea3097576 100644
--- a/components/camel-irc/src/test/java/org/apache/camel/component/irc/CodehausIrcChat.java
+++ b/components/camel-irc/src/test/java/org/apache/camel/component/irc/CodehausIrcChatManualTest.java
@@ -25,9 +25,9 @@ import org.schwering.irc.lib.IRCUser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class CodehausIrcChat {
+public final class CodehausIrcChatManualTest {
 
-    private static final Logger LOG = LoggerFactory.getLogger(CodehausIrcChat.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CodehausIrcChatManualTest.class);
 
     private static final class CodehausIRCEventAdapter extends IRCEventAdapter {
         @Override
@@ -86,7 +86,7 @@ public final class CodehausIrcChat {
         }
     }
 
-    private CodehausIrcChat() {
+    private CodehausIrcChatManualTest() {
     }
 
     public static void main(String[] args) throws InterruptedException {
diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyContinuationDisabledTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyContinuationDisabledTest.java
index 0efda1c7646..5ed72dbcbbb 100644
--- a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyContinuationDisabledTest.java
+++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyContinuationDisabledTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.jetty;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
 
@@ -44,12 +42,9 @@ public class JettyContinuationDisabledTest extends BaseJettyTest {
                 JettyHttpComponent jetty = context.getComponent("jetty", JettyHttpComponent.class);
                 jetty.setUseContinuation(false);
 
-                from("jetty:http://localhost:{{port}}/myservice").process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        Thread.sleep(1000);
-                        exchange.getMessage().setBody("Bye World");
-                    }
-                }).to("mock:result");
+                from("jetty:http://localhost:{{port}}/myservice")
+                        .process(exchange -> exchange.getMessage().setBody("Bye World"))
+                        .to("mock:result");
             }
         };
     }
diff --git a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyEndpointContinuationDisabledTest.java b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyEndpointContinuationDisabledTest.java
index c31b1d3584a..e8248162f2b 100644
--- a/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyEndpointContinuationDisabledTest.java
+++ b/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettyEndpointContinuationDisabledTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.jetty;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.junit.jupiter.api.Test;
 
@@ -40,12 +38,9 @@ public class JettyEndpointContinuationDisabledTest extends BaseJettyTest {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                from("jetty:http://localhost:{{port}}/myservice?useContinuation=false").process(new Processor() {
-                    public void process(Exchange exchange) throws Exception {
-                        Thread.sleep(1000);
-                        exchange.getMessage().setBody("Bye World");
-                    }
-                }).to("mock:result");
+                from("jetty:http://localhost:{{port}}/myservice?useContinuation=false")
+                        .process(exchange -> exchange.getMessage().setBody("Bye World"))
+                        .to("mock:result");
             }
         };
     }
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
index 01f05155965..4d264919f48 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/KubernetesClusterServiceTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -196,7 +197,7 @@ public class KubernetesClusterServiceTest extends CamelTestSupport {
 
     @ParameterizedTest
     @EnumSource(LeaseResourceType.class)
-    public void testSlowLeaderLosingLeadershipOnlyInternally(LeaseResourceType type) throws Exception {
+    public void testSlowLeaderLosingLeadershipOnlyInternally(LeaseResourceType type) {
         LeaderRecorder mypod1 = addMember("mypod1", type);
         LeaderRecorder mypod2 = addMember("mypod2", type);
         context.start();
@@ -211,8 +212,8 @@ public class KubernetesClusterServiceTest extends CamelTestSupport {
 
         delayRequestsFromPod(firstLeader, 10, TimeUnit.SECONDS);
 
-        Thread.sleep(LEASE_TIME_MILLIS);
-        assertNull(formerLeaderRecorder.getCurrentLeader());
+        await().atMost(LEASE_TIME_MILLIS, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> assertNull(formerLeaderRecorder.getCurrentLeader()));
         assertEquals(firstLeader, formerLoserRecorder.getCurrentLeader());
     }
 
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/TimedLeaderNotifierTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/TimedLeaderNotifierTest.java
index 53c7adb2464..29143474da7 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/TimedLeaderNotifierTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/cluster/TimedLeaderNotifierTest.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEvent;
@@ -30,6 +31,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -67,59 +70,60 @@ public class TimedLeaderNotifierTest {
     }
 
     @Test
-    public void testMultipleCalls() throws Exception {
+    public void testMultipleCalls() {
         Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
         notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members);
         notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members);
         notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members);
-        Thread.sleep(80);
-        assertEquals(Optional.of("three"), currentLeader);
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(() -> assertEquals(Optional.of("three"), currentLeader));
         assertEquals(members, currentMembers);
     }
 
     @Test
-    public void testExpiration() throws Exception {
+    public void testExpiration() {
         Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
         notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members);
         notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members);
-        Thread.sleep(160);
-        assertEquals(Optional.empty(), currentLeader);
+        await().atMost(160, TimeUnit.MILLISECONDS).untilAsserted(() -> assertEquals(Optional.empty(), currentLeader));
         assertEquals(members, currentMembers);
         notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members);
-        Thread.sleep(80);
-        assertEquals(Optional.of("three"), currentLeader);
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(() -> assertEquals(Optional.of("three"), currentLeader));
         assertEquals(members, currentMembers);
     }
 
     @Test
-    public void testMemberChanging() throws Exception {
+    public void testMemberChanging() {
         Set<String> members1 = Collections.singleton("one");
         Set<String> members2 = new TreeSet<>(Arrays.asList("one", "two"));
         notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members1);
         notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 5000L, members2);
-        Thread.sleep(80);
-        assertEquals(Optional.of("two"), currentLeader);
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(() -> assertEquals(Optional.of("two"), currentLeader));
         assertEquals(members2, currentMembers);
     }
 
     @Test
-    public void testOldData() throws Exception {
+    public void testOldData() {
         Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
         notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members);
-        Thread.sleep(80);
-        notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis() - 1000, 900L, members);
-        Thread.sleep(80);
-        assertEquals(Optional.empty(), currentLeader);
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(
+                () -> assertDoesNotThrow(() -> doRefreshLeadership("two", System.currentTimeMillis() - 1000, 900L, members)));
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(() -> assertEquals(Optional.empty(), currentLeader));
+    }
+
+    private void doRefreshLeadership(String two, long timestamp, long lease, Set<String> members) {
+        notifier.refreshLeadership(Optional.of(two), timestamp, lease, members);
     }
 
     @Test
-    public void testNewLeaderEmpty() throws Exception {
+    public void testNewLeaderEmpty() {
         Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
         notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members);
-        Thread.sleep(80);
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(() -> assertDoesNotThrow(() -> doRefreshLeadership(members)));
+        await().atMost(101, TimeUnit.MILLISECONDS).untilAsserted(() -> assertEquals(Optional.empty(), currentLeader));
+    }
+
+    private void doRefreshLeadership(Set<String> members) {
         notifier.refreshLeadership(Optional.empty(), null, null, members);
-        Thread.sleep(80);
-        assertEquals(Optional.empty(), currentLeader);
     }
 
 }
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/producer/KubernetesReplicationControllersProducerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/producer/KubernetesReplicationControllersProducerTest.java
index 4fcb4b23813..ffd87f95bc3 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/producer/KubernetesReplicationControllersProducerTest.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/producer/KubernetesReplicationControllersProducerTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kubernetes.producer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import io.fabric8.kubernetes.api.model.ReplicationController;
 import io.fabric8.kubernetes.api.model.ReplicationControllerBuilder;
@@ -30,6 +31,7 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.KubernetesServer;
 import org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.component.kubernetes.KubernetesTestSupport;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -117,7 +119,7 @@ public class KubernetesReplicationControllersProducerTest extends KubernetesTest
     }
 
     @Test
-    public void createScaleAndDeleteReplicationController() throws Exception {
+    public void createScaleAndDeleteReplicationController() {
         server.expect().withPath("/api/v1/namespaces/test/replicationcontrollers/repl1")
                 .andReturn(200, new ReplicationControllerBuilder().withNewMetadata().withName("repl1")
                         .withResourceVersion("1").endMetadata().withNewSpec().withReplicas(5).endSpec().withNewStatus()
@@ -135,10 +137,8 @@ public class KubernetesReplicationControllersProducerTest extends KubernetesTest
             exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_REPLICATION_CONTROLLER_REPLICAS, 1);
         });
 
-        Thread.sleep(3000);
-        int replicas = ex.getMessage().getBody(Integer.class);
-
-        assertEquals(5, replicas);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(
+                () -> assertEquals(5, ex.getMessage().getBody(Integer.class)));
     }
 
     @Override
diff --git a/components/camel-leveldb/pom.xml b/components/camel-leveldb/pom.xml
index b676ddd7c2f..7317db53b22 100644
--- a/components/camel-leveldb/pom.xml
+++ b/components/camel-leveldb/pom.xml
@@ -95,6 +95,11 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateDiscardOnTimeoutTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateDiscardOnTimeoutTest.java
index 681b769c553..3e4ae9264f1 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateDiscardOnTimeoutTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateDiscardOnTimeoutTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.params.Test;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -45,8 +46,8 @@ public class LevelDBAggregateDiscardOnTimeoutTest extends LevelDBTestSupport {
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
 
-        // wait 3 seconds
-        Thread.sleep(3000);
+        // wait at most 3 seconds
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> mock.assertIsSatisfied());
 
         mock.assertIsSatisfied();
 
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
index 4cd9225eec3..d46bcb6c8f0 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostRemovedWhenConfirmedTest.java
@@ -16,12 +16,15 @@
  */
 package org.apache.camel.component.leveldb;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit5.params.Test;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -55,9 +58,10 @@ public class LevelDBAggregateNotLostRemovedWhenConfirmedTest extends LevelDBTest
 
         assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
 
-        Thread.sleep(1000);
+        final List<Exchange> receivedExchanges = Awaitility.await().atMost(1, TimeUnit.SECONDS)
+                .until(() -> getMockEndpoint("mock:result").getReceivedExchanges(), Matchers.notNullValue());
 
-        String exchangeId = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
+        String exchangeId = receivedExchanges.get(0).getExchangeId();
 
         // the exchange should NOT be in the completed repo as it was confirmed
         final LevelDBFile levelDBFile = repo.getLevelDBFile();
diff --git a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
index fda9e672978..e51eaddcc06 100644
--- a/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
+++ b/components/camel-leveldb/src/test/java/org/apache/camel/component/leveldb/LevelDBAggregateNotLostTest.java
@@ -16,11 +16,14 @@
  */
 package org.apache.camel.component.leveldb;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit5.params.Test;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -53,9 +56,10 @@ public class LevelDBAggregateNotLostTest extends LevelDBTestSupport {
 
         assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
 
-        Thread.sleep(1000);
+        final List<Exchange> receivedExchanges = Awaitility.await().atMost(1, TimeUnit.SECONDS)
+                .until(() -> getMockEndpoint("mock:aggregated").getReceivedExchanges(), Matchers.notNullValue());
 
-        String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
+        String exchangeId = receivedExchanges.get(0).getExchangeId();
 
         // the exchange should be in the completed repo where we should be able to find it
         final LevelDBFile levelDBFile = getRepo().getLevelDBFile();