You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/06/21 14:40:06 UTC
[camel] 01/01: WireTapAbortPolicyTest is flaky
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch wiretap
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4ec85c5309cac2beca17455a5b352dbe2fcc62f9
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jun 21 16:39:17 2023 +0200
WireTapAbortPolicyTest is flaky
---
.../camel/processor/WireTapAbortPolicyTest.java | 27 ++++++++++++++++++----
1 file changed, 22 insertions(+), 5 deletions(-)
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
index 7eff82fdeab..0fbdc41cd8e 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapAbortPolicyTest.java
@@ -16,8 +16,11 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -26,6 +29,7 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
@@ -36,6 +40,10 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
@Isolated
public class WireTapAbortPolicyTest extends ContextTestSupport {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
protected MockEndpoint tap;
protected MockEndpoint result;
protected ExecutorService pool;
@@ -50,6 +58,7 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
}
@Test
+ @RepeatedTest(value = 1000)
public void testSend() throws Exception {
// hello must come first, as we have delay on the tapped route
result.expectedMinimumMessageCount(2);
@@ -62,6 +71,8 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
fail("Task should be rejected");
} catch (Exception e) {
assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+ } finally {
+ latch.countDown();
}
assertMockEndpointsSatisfied();
@@ -79,13 +90,14 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() throws Exception {
+
// START SNIPPET: e1
// use a custom thread pool for sending tapped messages
ExecutorService pool = new ThreadPoolBuilder(context)
- // only allow 1 thread and 1 pending task
- .poolSize(1)
- .maxPoolSize(1)
- .maxQueueSize(1)
+ // only allow 2 threads
+ .poolSize(2)
+ .maxPoolSize(2)
+ .maxQueueSize(0)
// and about tasks
.rejectedPolicy(ThreadPoolRejectedPolicy.Abort)
.build();
@@ -95,7 +107,12 @@ public class WireTapAbortPolicyTest extends ContextTestSupport {
.wireTap("direct:tap").executorService(pool).to("mock:result");
// END SNIPPET: e1
- from("direct:tap").delay(1000).to("mock:tap");
+ from("direct:tap")
+ .process(e -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ })
+ .process(e -> latch.await(5, TimeUnit.SECONDS))
+ .to("mock:tap");
}
};
}