You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/20 16:33:11 UTC
[camel] 02/02: CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8fb0cb3a1688cd515c4cdc89d3ade25d38948939
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 20 18:32:04 2022 +0200
CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
---
...t.java => BatchConsumerPooledExchangeTest.java} | 44 +++++++++++++++++-----
.../apache/camel/processor/PooledExchangeTest.java | 29 ++++++++++----
2 files changed, 57 insertions(+), 16 deletions(-)
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/BatchConsumerPooledExchangeTest.java
similarity index 64%
copy from core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
copy to core/camel-core/src/test/java/org/apache/camel/processor/BatchConsumerPooledExchangeTest.java
index f771dc94aa5..0c5b017e2c2 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/BatchConsumerPooledExchangeTest.java
@@ -27,34 +27,59 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
-public class PooledExchangeTest extends ContextTestSupport {
+public class BatchConsumerPooledExchangeTest extends ContextTestSupport {
private final AtomicInteger counter = new AtomicInteger();
private final AtomicReference<Exchange> ref = new AtomicReference<>();
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
- return context;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+ ecc.setExchangeFactory(new PooledExchangeFactory());
+ ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+ ecc.getExchangeFactory().setStatisticsEnabled(true);
+ ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+ return ecc;
+ }
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ template.sendBodyAndHeader(fileUri(), "aaa", Exchange.FILE_NAME, "aaa.txt");
+ template.sendBodyAndHeader(fileUri(), "bbb", Exchange.FILE_NAME, "bbb.txt");
+ template.sendBodyAndHeader(fileUri(), "ccc", Exchange.FILE_NAME, "ccc.txt");
}
@Test
- public void testSameExchange() throws Exception {
+ public void testNotSameExchange() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(2);
- mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3);
- mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4);
+ mock.expectedMessageCount(3);
+ mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5);
+ mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6);
mock.message(0).header("first").isEqualTo(true);
mock.message(1).header("first").isNull();
+ mock.message(2).header("first").isNull();
context.getRouteController().startAllRoutes();
assertMockEndpointsSatisfied();
+
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(2, stat.getAcquiredCounter());
+ assertEquals(3, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
}
@Override
@@ -62,7 +87,8 @@ public class PooledExchangeTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("timer:foo?period=1&delay=1&repeatCount=2").noAutoStartup()
+ // maxMessagesPerPoll=1 to force polling 3 times to use pooled exchanges
+ from(fileUri("?initialDelay=0&delay=10&maxMessagesPerPoll=1")).noAutoStartup()
.setProperty("myprop", counter::incrementAndGet)
.setHeader("myheader", counter::incrementAndGet)
.process(new Processor() {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
index f771dc94aa5..e33c177aa1f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/PooledExchangeTest.java
@@ -27,8 +27,11 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
public class PooledExchangeTest extends ContextTestSupport {
@@ -38,23 +41,35 @@ public class PooledExchangeTest extends ContextTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
- return context;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+ ecc.setExchangeFactory(new PooledExchangeFactory());
+ ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+ ecc.getExchangeFactory().setStatisticsEnabled(true);
+ ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+ return ecc;
}
@Test
public void testSameExchange() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(2);
- mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3);
- mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4);
+ mock.expectedMessageCount(3);
+ mock.expectedPropertyValuesReceivedInAnyOrder("myprop", 1, 3, 5);
+ mock.expectedHeaderValuesReceivedInAnyOrder("myheader", 2, 4, 6);
mock.message(0).header("first").isEqualTo(true);
mock.message(1).header("first").isNull();
+ mock.message(2).header("first").isNull();
context.getRouteController().startAllRoutes();
assertMockEndpointsSatisfied();
+
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(2, stat.getAcquiredCounter());
+ assertEquals(3, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
}
@Override
@@ -62,7 +77,7 @@ public class PooledExchangeTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("timer:foo?period=1&delay=1&repeatCount=2").noAutoStartup()
+ from("timer:foo?period=1&delay=1&repeatCount=3").noAutoStartup()
.setProperty("myprop", counter::incrementAndGet)
.setHeader("myheader", counter::incrementAndGet)
.process(new Processor() {