You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/20 19:42:06 UTC
[camel] 03/03: CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f2eb5cb8decf18a98228ca2a75417bf8ce768b15
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 20 21:41:47 2022 +0200
CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
---
.../component/jms/JmsInOnlyPooledExchangeTest.java | 37 +++++++++++++++++++---
.../http/NettyHttpSimplePooledExchangeTest.java | 31 ++++++++++++++++--
.../sjms/consumer/InOnlyPooledExchangeTest.java | 36 +++++++++++++++++++--
3 files changed, 94 insertions(+), 10 deletions(-)
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java
index e05dadd1aad..60f87e05a70 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.jms;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.ConnectionFactory;
import org.apache.camel.CamelContext;
@@ -23,10 +25,14 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class JmsInOnlyPooledExchangeTest extends CamelTestSupport {
@@ -43,6 +49,15 @@ public class JmsInOnlyPooledExchangeTest extends CamelTestSupport {
template.sendBody(JMS_QUEUE_NAME, expectedBody);
mock.assertIsSatisfied();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(0, stat.getAcquiredCounter());
+ assertEquals(1, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
+ });
}
@Test
@@ -54,16 +69,30 @@ public class JmsInOnlyPooledExchangeTest extends CamelTestSupport {
template.sendBody(JMS_QUEUE_NAME, "Bye World");
mock.assertIsSatisfied();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(1, stat.getAcquiredCounter());
+ assertEquals(2, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
+ });
}
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext camelContext = super.createCamelContext();
- camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
+ ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+
+ ecc.setExchangeFactory(new PooledExchangeFactory());
+ ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+ ecc.getExchangeFactory().setStatisticsEnabled(true);
+ ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
- camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
- return camelContext;
+ ecc.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+
+ return ecc;
}
@Override
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java
index f9428d18e28..ea5e647857d 100644
--- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java
@@ -22,6 +22,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.MethodOrderer;
@@ -36,9 +38,14 @@ public class NettyHttpSimplePooledExchangeTest extends BaseNettyTest {
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext camelContext = super.createCamelContext();
- camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
- return camelContext;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+
+ ecc.setExchangeFactory(new PooledExchangeFactory());
+ ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+ ecc.getExchangeFactory().setStatisticsEnabled(true);
+ ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+ return ecc;
}
@Order(1)
@@ -52,6 +59,15 @@ public class NettyHttpSimplePooledExchangeTest extends BaseNettyTest {
assertEquals("Bye World", out);
assertMockEndpointsSatisfied();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(0, stat.getAcquiredCounter());
+ assertEquals(1, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
+ });
}
@Order(2)
@@ -72,6 +88,15 @@ public class NettyHttpSimplePooledExchangeTest extends BaseNettyTest {
});
assertMockEndpointsSatisfied();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(2, stat.getAcquiredCounter());
+ assertEquals(3, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
+ });
}
@Override
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
index 4596541366b..c57b09343bf 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
@@ -16,13 +16,20 @@
*/
package org.apache.camel.component.sjms.consumer;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.sjms.support.JmsTestSupport;
import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class InOnlyPooledExchangeTest extends JmsTestSupport {
@@ -31,9 +38,14 @@ public class InOnlyPooledExchangeTest extends JmsTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
- CamelContext context = super.createCamelContext();
- context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
- return context;
+ ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+
+ ecc.setExchangeFactory(new PooledExchangeFactory());
+ ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+ ecc.getExchangeFactory().setStatisticsEnabled(true);
+ ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+ return ecc;
}
@Test
@@ -46,6 +58,15 @@ public class InOnlyPooledExchangeTest extends JmsTestSupport {
template.sendBody(SJMS_QUEUE_NAME, expectedBody);
mock.assertIsSatisfied();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(0, stat.getAcquiredCounter());
+ assertEquals(1, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
+ });
}
@Test
@@ -57,6 +78,15 @@ public class InOnlyPooledExchangeTest extends JmsTestSupport {
template.sendBody(SJMS_QUEUE_NAME, "Bye World");
mock.assertIsSatisfied();
+
+ Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ PooledObjectFactory.Statistics stat
+ = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+ assertEquals(1, stat.getCreatedCounter());
+ assertEquals(1, stat.getAcquiredCounter());
+ assertEquals(2, stat.getReleasedCounter());
+ assertEquals(0, stat.getDiscardedCounter());
+ });
}
@Override