You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/07/15 08:58:16 UTC
[camel] 01/02: Revert "(chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups"
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 159d3d6106a2dca565806ca252f8b7e4822860d5
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Jul 15 09:34:24 2023 +0200
Revert "(chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups"
This reverts commit cce62c2ce9977eeeb95305dd1611e07a566b0624.
---
.../RecipientListWithSimpleExpressionTest.java | 110 ++++++++++++++-------
1 file changed, 74 insertions(+), 36 deletions(-)
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
index 4761b06f928..fbda46e7a9a 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
@@ -16,70 +16,108 @@
*/
package org.apache.camel.processor;
-import java.time.Duration;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Header;
import org.apache.camel.builder.RouteBuilder;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.parallel.Isolated;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-@Isolated("This test creates a larger thread pool, which may be too much on slower hosts")
public class RecipientListWithSimpleExpressionTest extends ContextTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(RecipientListWithSimpleExpressionTest.class);
- private final ScheduledExecutorService executors = Executors.newScheduledThreadPool(10);
- private final Phaser phaser = new Phaser(50);
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testRecipientList() throws Exception {
+ context.addRoutes(new RouteBuilder() {
@Override
- public void configure() {
+ public void configure() throws Exception {
from("direct:start").recipientList(simple("mock:${in.header.queue}"));
}
- };
- }
+ });
+ context.start();
+ template.start();
- @BeforeEach
- void sendMessages() {
- // it may take a little while for the context to start on slower hosts
- Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> context.getUptimeMillis() > 1000);
+ for (int i = 0; i < 10; i++) {
+ getMockEndpoint("mock:" + i).expectedMessageCount(50);
+ }
// use concurrent producers to send a lot of messages
+ ExecutorService executors = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50; i++) {
- final Runnable runOverRunnable = new Runnable() {
- int i;
-
- @Override
+ executors.execute(new Runnable() {
public void run() {
- template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i);
- i++;
- if (i == 10) {
- i = 0;
+ for (int i = 0; i < 10; i++) {
+ try {
+ template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i);
+ Thread.sleep(5);
+ } catch (Exception e) {
+ // ignore
+ }
}
}
- };
- executors.scheduleAtFixedRate(runOverRunnable, 0, 50, TimeUnit.MILLISECONDS);
- phaser.arrive();
+ });
+ }
+
+ assertMockEndpointsSatisfied();
+ executors.shutdownNow();
+ }
+
+ public static class MyBeanRouter {
+
+ @org.apache.camel.RecipientList
+ public String route(@Header("queue") String queue) {
+ return "mock:" + queue;
}
}
@Test
- public void testRecipientList() throws InterruptedException, TimeoutException {
+ public void testStatic() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:0").to("mock:0");
+ from("direct:1").to("mock:1");
+ from("direct:2").to("mock:2");
+ from("direct:3").to("mock:3");
+ from("direct:4").to("mock:4");
+ from("direct:5").to("mock:5");
+ from("direct:6").to("mock:6");
+ from("direct:7").to("mock:7");
+ from("direct:8").to("mock:8");
+ from("direct:9").to("mock:9");
+ }
+ });
+ context.start();
+ template.start();
+
for (int i = 0; i < 10; i++) {
getMockEndpoint("mock:" + i).expectedMessageCount(50);
}
- phaser.awaitAdvanceInterruptibly(0, 5000, TimeUnit.SECONDS);
+ // use concurrent producers to send a lot of messages
+ ExecutorService executors = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 50; i++) {
+ executors.execute(new Runnable() {
+ public void run() {
+ for (int i = 0; i < 10; i++) {
+ try {
+ template.sendBodyAndHeader("direct:" + i, "Hello " + i, "queue", i);
+ Thread.sleep(5);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ });
+ }
+
assertMockEndpointsSatisfied();
executors.shutdownNow();
}
+
}