You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/06/21 17:38:10 UTC

[camel] branch main updated: WireTapAbortPolicyTest is flaky

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9dd5d8061a1 WireTapAbortPolicyTest is flaky
9dd5d8061a1 is described below

commit 9dd5d8061a15165b1bcd88c00953a61fd891f2b8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 21 16:39:17 2023 +0200

    WireTapAbortPolicyTest is flaky
---
 .../camel/processor/WireTapAbortPolicyTest.java    | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 7eff82fdeab..0fbdc41cd8e 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
@@ -26,6 +29,7 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Isolated;
 
@@ -36,6 +40,10 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @Isolated
 public class WireTapAbortPolicyTest extends ContextTestSupport {
+
+    final CountDownLatch latch = new CountDownLatch(1);
+    final CyclicBarrier barrier = new CyclicBarrier(2);
+
     protected MockEndpoint tap;
     protected MockEndpoint result;
     protected ExecutorService pool;
@@ -50,6 +58,7 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
     }
 
     @Test
+    @RepeatedTest(value = 1000)
     public void testSend() throws Exception {
         // hello must come first, as we have delay on the tapped route
         result.expectedMinimumMessageCount(2);
@@ -62,6 +71,8 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
             fail("Task should be rejected");
         } catch (Exception e) {
             assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+        } finally {
+            latch.countDown();
         }
 
         assertMockEndpointsSatisfied();
@@ -79,13 +90,14 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() throws Exception {
+
                 // START SNIPPET: e1
                 // use a custom thread pool for sending tapped messages
                 ExecutorService pool = new ThreadPoolBuilder(context)
-                        // only allow 1 thread and 1 pending task
-                        .poolSize(1)
-                        .maxPoolSize(1)
-                        .maxQueueSize(1)
+                        // only allow 2 threads
+                        .poolSize(2)
+                        .maxPoolSize(2)
+                        .maxQueueSize(0)
                         // and about tasks
                         .rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
                         .build();
@@ -95,7 +107,12 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
                         .wireTap("direct:tap").executorService(pool).to("mock:result");
                 // END SNIPPET: e1
 
-                from("direct:tap").delay(1000).to("mock:tap");
+                from("direct:tap")
+                        .process(e -> {
+                            barrier.await(5, TimeUnit.SECONDS);
+                        })
+                        .process(e -> latch.await(5, TimeUnit.SECONDS))
+                        .to("mock:tap");
             }
         };
     }