You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/20 19:42:06 UTC

[camel] 03/03: CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f2eb5cb8decf18a98228ca2a75417bf8ce768b15
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 20 21:41:47 2022 +0200

    CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
---
 .../component/jms/JmsInOnlyPooledExchangeTest.java | 37 +++++++++++++++++++---
 .../http/NettyHttpSimplePooledExchangeTest.java    | 31 ++++++++++++++++--
 .../sjms/consumer/InOnlyPooledExchangeTest.java    | 36 +++++++++++++++++++--
 3 files changed, 94 insertions(+), 10 deletions(-)

diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java
index e05dadd1aad..60f87e05a70 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyPooledExchangeTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.ConnectionFactory;
 
 import org.apache.camel.CamelContext;
@@ -23,10 +25,14 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
 import org.apache.camel.test.junit5.CamelTestSupport;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class JmsInOnlyPooledExchangeTest extends CamelTestSupport {
 
@@ -43,6 +49,15 @@ public class JmsInOnlyPooledExchangeTest extends CamelTestSupport {
         template.sendBody(JMS_QUEUE_NAME, expectedBody);
 
         mock.assertIsSatisfied();
+
+        Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            PooledObjectFactory.Statistics stat
+                    = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+            assertEquals(1, stat.getCreatedCounter());
+            assertEquals(0, stat.getAcquiredCounter());
+            assertEquals(1, stat.getReleasedCounter());
+            assertEquals(0, stat.getDiscardedCounter());
+        });
     }
 
     @Test
@@ -54,16 +69,30 @@ public class JmsInOnlyPooledExchangeTest extends CamelTestSupport {
         template.sendBody(JMS_QUEUE_NAME, "Bye World");
 
         mock.assertIsSatisfied();
+
+        Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            PooledObjectFactory.Statistics stat
+                    = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+            assertEquals(1, stat.getCreatedCounter());
+            assertEquals(1, stat.getAcquiredCounter());
+            assertEquals(2, stat.getReleasedCounter());
+            assertEquals(0, stat.getDiscardedCounter());
+        });
     }
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext camelContext = super.createCamelContext();
-        camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
+        ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+
+        ecc.setExchangeFactory(new PooledExchangeFactory());
+        ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+        ecc.getExchangeFactory().setStatisticsEnabled(true);
+        ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
 
         ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory();
-        camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
-        return camelContext;
+        ecc.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory));
+
+        return ecc;
     }
 
     @Override
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java
index f9428d18e28..ea5e647857d 100644
--- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettyHttpSimplePooledExchangeTest.java
@@ -22,6 +22,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.MethodOrderer;
@@ -36,9 +38,14 @@ public class NettyHttpSimplePooledExchangeTest extends BaseNettyTest {
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext camelContext = super.createCamelContext();
-        camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
-        return camelContext;
+        ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+
+        ecc.setExchangeFactory(new PooledExchangeFactory());
+        ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+        ecc.getExchangeFactory().setStatisticsEnabled(true);
+        ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+        return ecc;
     }
 
     @Order(1)
@@ -52,6 +59,15 @@ public class NettyHttpSimplePooledExchangeTest extends BaseNettyTest {
         assertEquals("Bye World", out);
 
         assertMockEndpointsSatisfied();
+
+        Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            PooledObjectFactory.Statistics stat
+                    = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+            assertEquals(1, stat.getCreatedCounter());
+            assertEquals(0, stat.getAcquiredCounter());
+            assertEquals(1, stat.getReleasedCounter());
+            assertEquals(0, stat.getDiscardedCounter());
+        });
     }
 
     @Order(2)
@@ -72,6 +88,15 @@ public class NettyHttpSimplePooledExchangeTest extends BaseNettyTest {
                 });
 
         assertMockEndpointsSatisfied();
+
+        Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            PooledObjectFactory.Statistics stat
+                    = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+            assertEquals(1, stat.getCreatedCounter());
+            assertEquals(2, stat.getAcquiredCounter());
+            assertEquals(3, stat.getReleasedCounter());
+            assertEquals(0, stat.getDiscardedCounter());
+        });
     }
 
     @Override
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
index 4596541366b..c57b09343bf 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
@@ -16,13 +16,20 @@
  */
 package org.apache.camel.component.sjms.consumer;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.support.JmsTestSupport;
 import org.apache.camel.impl.engine.PooledExchangeFactory;
+import org.apache.camel.impl.engine.PooledProcessorExchangeFactory;
+import org.apache.camel.spi.PooledObjectFactory;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class InOnlyPooledExchangeTest extends JmsTestSupport {
 
@@ -31,9 +38,14 @@ public class InOnlyPooledExchangeTest extends JmsTestSupport {
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        CamelContext context = super.createCamelContext();
-        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
-        return context;
+        ExtendedCamelContext ecc = (ExtendedCamelContext) super.createCamelContext();
+
+        ecc.setExchangeFactory(new PooledExchangeFactory());
+        ecc.setProcessorExchangeFactory(new PooledProcessorExchangeFactory());
+        ecc.getExchangeFactory().setStatisticsEnabled(true);
+        ecc.getProcessorExchangeFactory().setStatisticsEnabled(true);
+
+        return ecc;
     }
 
     @Test
@@ -46,6 +58,15 @@ public class InOnlyPooledExchangeTest extends JmsTestSupport {
         template.sendBody(SJMS_QUEUE_NAME, expectedBody);
 
         mock.assertIsSatisfied();
+
+        Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            PooledObjectFactory.Statistics stat
+                    = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+            assertEquals(1, stat.getCreatedCounter());
+            assertEquals(0, stat.getAcquiredCounter());
+            assertEquals(1, stat.getReleasedCounter());
+            assertEquals(0, stat.getDiscardedCounter());
+        });
     }
 
     @Test
@@ -57,6 +78,15 @@ public class InOnlyPooledExchangeTest extends JmsTestSupport {
         template.sendBody(SJMS_QUEUE_NAME, "Bye World");
 
         mock.assertIsSatisfied();
+
+        Awaitility.waitAtMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            PooledObjectFactory.Statistics stat
+                    = context.adapt(ExtendedCamelContext.class).getExchangeFactoryManager().getStatistics();
+            assertEquals(1, stat.getCreatedCounter());
+            assertEquals(1, stat.getAcquiredCounter());
+            assertEquals(2, stat.getReleasedCounter());
+            assertEquals(0, stat.getDiscardedCounter());
+        });
     }
 
     @Override