You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/07/14 14:40:59 UTC
[camel] 03/04: (chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit cce62c2ce9977eeeb95305dd1611e07a566b0624
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jul 14 14:27:27 2023 +0200
(chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups
- removed unrelated test
- cleaned up the code
- isolate the test, as it requires more resources than the rest of the tests
- improve overall coordination of concurrent parties
---
.../RecipientListWithSimpleExpressionTest.java | 109 +++++++--------------
1 file changed, 36 insertions(+), 73 deletions(-)
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
index fbda46e7a9a..acf289640f2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
@@ -16,108 +16,71 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.ExecutorService;
+import java.time.Duration;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Header;
import org.apache.camel.builder.RouteBuilder;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+@Isolated("This test creates a larger thread pool, which may be too much on slower hosts")
public class RecipientListWithSimpleExpressionTest extends ContextTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(RecipientListWithSimpleExpressionTest.class);
+ private final ScheduledExecutorService executors = Executors.newScheduledThreadPool(10);
+ private final Phaser phaser = new Phaser(50);
@Override
- public boolean isUseRouteBuilder() {
- return false;
- }
-
- @Test
- public void testRecipientList() throws Exception {
- context.addRoutes(new RouteBuilder() {
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
@Override
- public void configure() throws Exception {
+ public void configure() {
from("direct:start").recipientList(simple("mock:${in.header.queue}"));
}
- });
- context.start();
- template.start();
+ };
+ }
- for (int i = 0; i < 10; i++) {
- getMockEndpoint("mock:" + i).expectedMessageCount(50);
- }
+ @BeforeEach
+ void sendMessages() {
+ // it may take a little while for the context to start on slower hosts
+ Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> context.getUptimeMillis() > 1000);
// use concurrent producers to send a lot of messages
- ExecutorService executors = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50; i++) {
- executors.execute(new Runnable() {
+ final Runnable runOverRunnable = new Runnable() {
+ int i;
+
+ @Override
public void run() {
- for (int i = 0; i < 10; i++) {
- try {
- template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i);
- Thread.sleep(5);
- } catch (Exception e) {
- // ignore
- }
+ template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i);
+ i++;
+ if (i == 10) {
+ i = 0;
}
}
- });
+ };
+ executors.scheduleAtFixedRate(runOverRunnable, 0, 50, TimeUnit.MILLISECONDS);
+ phaser.arrive();
}
-
- assertMockEndpointsSatisfied();
- executors.shutdownNow();
}
- public static class MyBeanRouter {
-
- @org.apache.camel.RecipientList
- public String route(@Header("queue") String queue) {
- return "mock:" + queue;
- }
- }
@Test
- public void testStatic() throws Exception {
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("direct:0").to("mock:0");
- from("direct:1").to("mock:1");
- from("direct:2").to("mock:2");
- from("direct:3").to("mock:3");
- from("direct:4").to("mock:4");
- from("direct:5").to("mock:5");
- from("direct:6").to("mock:6");
- from("direct:7").to("mock:7");
- from("direct:8").to("mock:8");
- from("direct:9").to("mock:9");
- }
- });
- context.start();
- template.start();
-
+ public void testRecipientList() throws InterruptedException, TimeoutException {
for (int i = 0; i < 10; i++) {
getMockEndpoint("mock:" + i).expectedMessageCount(50);
}
- // use concurrent producers to send a lot of messages
- ExecutorService executors = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 50; i++) {
- executors.execute(new Runnable() {
- public void run() {
- for (int i = 0; i < 10; i++) {
- try {
- template.sendBodyAndHeader("direct:" + i, "Hello " + i, "queue", i);
- Thread.sleep(5);
- } catch (Exception e) {
- // ignore
- }
- }
- }
- });
- }
-
+ phaser.awaitAdvanceInterruptibly(0, 5000, TimeUnit.SECONDS);
assertMockEndpointsSatisfied();
executors.shutdownNow();
}
-
}