You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/07/14 14:40:59 UTC

[camel] 03/04: (chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cce62c2ce9977eeeb95305dd1611e07a566b0624
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jul 14 14:27:27 2023 +0200

    (chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups
    
    - removed unrelated test
    - cleaned up the code
    - isolate the test, as it requires more resources than the rest of the tests
    - improve overall coordination of concurrent parties
---
 .../RecipientListWithSimpleExpressionTest.java     | 109 +++++++--------------
 1 file changed, 36 insertions(+), 73 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
index fbda46e7a9a..acf289640f2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java
@@ -16,108 +16,71 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.ExecutorService;
+import java.time.Duration;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Header;
 import org.apache.camel.builder.RouteBuilder;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+@Isolated("This test creates a larger thread pool, which may be too much on slower hosts")
 public class RecipientListWithSimpleExpressionTest extends ContextTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RecipientListWithSimpleExpressionTest.class);
+    private final ScheduledExecutorService executors = Executors.newScheduledThreadPool(10);
+    private final Phaser phaser = new Phaser(50);
 
     @Override
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
-
-    @Test
-    public void testRecipientList() throws Exception {
-        context.addRoutes(new RouteBuilder() {
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
             @Override
-            public void configure() throws Exception {
+            public void configure() {
                 from("direct:start").recipientList(simple("mock:${in.header.queue}"));
             }
-        });
-        context.start();
-        template.start();
+        };
+    }
 
-        for (int i = 0; i < 10; i++) {
-            getMockEndpoint("mock:" + i).expectedMessageCount(50);
-        }
+    @BeforeEach
+    void sendMessages() {
+        // it may take a little while for the context to start on slower hosts
+        Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> context.getUptimeMillis() > 1000);
 
         // use concurrent producers to send a lot of messages
-        ExecutorService executors = Executors.newFixedThreadPool(10);
         for (int i = 0; i < 50; i++) {
-            executors.execute(new Runnable() {
+            final Runnable runOverRunnable = new Runnable() {
+                int i;
+
+                @Override
                 public void run() {
-                    for (int i = 0; i < 10; i++) {
-                        try {
-                            template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i);
-                            Thread.sleep(5);
-                        } catch (Exception e) {
-                            // ignore
-                        }
+                    template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i);
+                    i++;
+                    if (i == 10) {
+                        i = 0;
                     }
                 }
-            });
+            };
+            executors.scheduleAtFixedRate(runOverRunnable, 0, 50, TimeUnit.MILLISECONDS);
+            phaser.arrive();
         }
-
-        assertMockEndpointsSatisfied();
-        executors.shutdownNow();
     }
 
-    public static class MyBeanRouter {
-
-        @org.apache.camel.RecipientList
-        public String route(@Header("queue") String queue) {
-            return "mock:" + queue;
-        }
-    }
 
     @Test
-    public void testStatic() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:0").to("mock:0");
-                from("direct:1").to("mock:1");
-                from("direct:2").to("mock:2");
-                from("direct:3").to("mock:3");
-                from("direct:4").to("mock:4");
-                from("direct:5").to("mock:5");
-                from("direct:6").to("mock:6");
-                from("direct:7").to("mock:7");
-                from("direct:8").to("mock:8");
-                from("direct:9").to("mock:9");
-            }
-        });
-        context.start();
-        template.start();
-
+    public void testRecipientList() throws InterruptedException, TimeoutException {
         for (int i = 0; i < 10; i++) {
             getMockEndpoint("mock:" + i).expectedMessageCount(50);
         }
 
-        // use concurrent producers to send a lot of messages
-        ExecutorService executors = Executors.newFixedThreadPool(10);
-        for (int i = 0; i < 50; i++) {
-            executors.execute(new Runnable() {
-                public void run() {
-                    for (int i = 0; i < 10; i++) {
-                        try {
-                            template.sendBodyAndHeader("direct:" + i, "Hello " + i, "queue", i);
-                            Thread.sleep(5);
-                        } catch (Exception e) {
-                            // ignore
-                        }
-                    }
-                }
-            });
-        }
-
+        phaser.awaitAdvanceInterruptibly(0, 5000, TimeUnit.SECONDS);
         assertMockEndpointsSatisfied();
         executors.shutdownNow();
     }
-
 }