You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/20 16:33:11 UTC

[camel] 02/02: CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8fb0cb3a1688cd515c4cdc89d3ade25d38948939
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 20 18:32:04 2022 +0200

    CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
---
 ...t.java => BatchConsumerPooledExchangeTest.java} | 44 +++++++++++++++++-----
 .../apache/camel/processor/PooledExchangeTest.java | 29 ++++++++++----
 2 files changed, 57 insertions(+), 16 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/BatchConsumerPooledExchangeTest.java
similarity index 64%
copy from core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/BatchConsumerPooledExchangeTest.java
index f771dc94aa5..0c5b017e2c2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/BatchConsumerPooledExchangeTest.java
@@ -27,34 +27,59 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 
-public class PooledExchangeTest extends ContextTestSupport {
+public class BatchConsumerPooledExchangeTest extends ContextTestSupport {
 
     private final AtomicInteger counter = new AtomicInteger();
     private final AtomicReference<Exchange> ref = new AtomicReference<>();
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
-        return context;
+        ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+        ecc.setExchangeFactory(new PooledExchangeFactory());
+        ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+        ecc.getExchangeFactory().setStatisticsEnabled(true);
+        ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+        return ecc;
+    }
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+        template.sendBodyAndHeader(fileUri(), "aaa", Exchange.FILE_NAME, "aaa.txt");
+        template.sendBodyAndHeader(fileUri(), "bbb", Exchange.FILE_NAME, "bbb.txt");
+        template.sendBodyAndHeader(fileUri(), "ccc", Exchange.FILE_NAME, "ccc.txt");
     }
 
     @Test
-    public void testSameExchange() throws Exception {
+    public void testNotSameExchange() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(2);
-        mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3);
-        mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4);
+        mock.expectedMessageCount(3);
+        mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5);
+        mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6);
         mock.message(0).header("first").isEqualTo(true);
         mock.message(1).header("first").isNull();
+        mock.message(2).header("first").isNull();
 
         context.getRouteController().startAllRoutes();
 
         assertMockEndpointsSatisfied();
+
+        PooledObjectFactory.Statistics stat
+                = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+        assertEquals(1, stat.getCreatedCounter());
+        assertEquals(2, stat.getAcquiredCounter());
+        assertEquals(3, stat.getReleasedCounter());
+        assertEquals(0, stat.getDiscardedCounter());
     }
 
     @Override
@@ -62,7 +87,8 @@ public class PooledExchangeTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:foo?period=1&delay=1&repeatCount=2").noAutoStartup()
+                // maxMessagesPerPoll=1 to force polling 3 times to use pooled exchanges
+                from(fileUri("?initialDelay=0&delay=10&maxMessagesPerPoll=1")).noAutoStartup()
                         .setProperty("myprop", counter::incrementAndGet)
                         .setHeader("myheader", counter::incrementAndGet)
                         .process(new Processor() {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
index f771dc94aa5..e33c177aa1f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
@@ -27,8 +27,11 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 
 public class PooledExchangeTest extends ContextTestSupport {
@@ -38,23 +41,35 @@ public class PooledExchangeTest extends ContextTestSupport {
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
-        return context;
+        ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+        ecc.setExchangeFactory(new PooledExchangeFactory());
+        ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+        ecc.getExchangeFactory().setStatisticsEnabled(true);
+        ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+        return ecc;
     }
 
     @Test
     public void testSameExchange() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(2);
-        mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3);
-        mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4);
+        mock.expectedMessageCount(3);
+        mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5);
+        mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6);
         mock.message(0).header("first").isEqualTo(true);
         mock.message(1).header("first").isNull();
+        mock.message(2).header("first").isNull();
 
         context.getRouteController().startAllRoutes();
 
         assertMockEndpointsSatisfied();
+
+        PooledObjectFactory.Statistics stat
+                = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+        assertEquals(1, stat.getCreatedCounter());
+        assertEquals(2, stat.getAcquiredCounter());
+        assertEquals(3, stat.getReleasedCounter());
+        assertEquals(0, stat.getDiscardedCounter());
     }
 
     @Override
@@ -62,7 +77,7 @@ public class PooledExchangeTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:foo?period=1&delay=1&repeatCount=2").noAutoStartup()
+                from("timer:foo?period=1&delay=1&repeatCount=3").noAutoStartup()
                         .setProperty("myprop", counter::incrementAndGet)
                         .setHeader("myheader", counter::incrementAndGet)
                         .process(new Processor() {