You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/20 16:33:10 UTC

[camel] 01/02: CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6a95f58c5c43d83492c0ef2ea42db880f5308a04
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jun 20 16:27:16 2022 +0200

    CAMEL-18210: camel-core - Pooled exchanges in batch consumer may use an exchange concurrently
---
 .../main/java/org/apache/camel/PooledExchange.java |  2 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       |  2 +-
 .../camel/impl/engine/PooledExchangeFactory.java   | 24 +++++++++++---------
 .../engine/PooledProcessorExchangeFactory.java     | 26 +++++++++++++---------
 .../org/apache/camel/support/DefaultConsumer.java  |  9 ++++++--
 .../camel/support/DefaultPooledExchange.java       |  6 ++---
 6 files changed, 40 insertions(+), 29 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
index 764ed2b2ad8..0c471a78a51 100644
--- a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
@@ -45,7 +45,7 @@ public interface PooledExchange extends ExtendedExchange {
      * <p/>
      * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself.
      */
-    void done(boolean forced);
+    void done();
 
     /**
      * Resets the exchange for reuse with the given created timestamp;
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 80b08c3634d..36821bbfc1a 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -259,7 +259,7 @@ public class DefaultUnitOfWork implements UnitOfWork {
             // pooled exchange has its own done logic which will reset this uow for reuse
             // so do not call onDone
             try {
-                ((PooledExchange) exchange).done(false);
+                ((PooledExchange) exchange).done();
             } catch (Throwable e) {
                 // must catch exceptions to ensure synchronizations is also invoked
                 log.warn("Exception occurred during exchange done. This exception will be ignored.", e);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 1f9995617d0..a02ca09b438 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -44,7 +44,7 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
     @Override
     protected void doBuild() throws Exception {
         super.doBuild();
-        // force to create and load the class during build time so the JVM does not
+        // force creating and load the class during build time so the JVM does not
         // load the class on first exchange to be created
         DefaultPooledExchange dummy = new DefaultPooledExchange(camelContext);
         // force message init to load classes
@@ -75,10 +75,12 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
             if (statisticsEnabled) {
                 statistics.acquired.increment();
             }
-            // reset exchange for reuse
-            PooledExchange ee = (PooledExchange) exchange;
-            ee.reset(System.currentTimeMillis());
         }
+
+        // reset exchange for reuse
+        PooledExchange ee = (PooledExchange) exchange;
+        ee.reset(System.currentTimeMillis());
+
         return exchange;
     }
 
@@ -95,21 +97,21 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
             if (statisticsEnabled) {
                 statistics.acquired.increment();
             }
-            // reset exchange for reuse
-            PooledExchange ee = (PooledExchange) exchange;
-            ee.reset(System.currentTimeMillis());
         }
+
+        // reset exchange for reuse
+        PooledExchange ee = (PooledExchange) exchange;
+        ee.reset(System.currentTimeMillis());
+
         return exchange;
     }
 
     @Override
     public boolean release(Exchange exchange) {
         try {
-            // done exchange before returning back to pool
+            // done exchange before returning to pool
             PooledExchange ee = (PooledExchange) exchange;
-            boolean force = !ee.isAutoRelease();
-            ee.done(force);
-            ee.onDone(null);
+            ee.done();
 
             // only release back in pool if reset was success
             boolean inserted = pool.offer(exchange);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index ed4620d9e6d..48dcc697cbf 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -70,11 +70,12 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
             if (statisticsEnabled) {
                 statistics.acquired.increment();
             }
-            // reset exchange for reuse
-            PooledExchange ee = (PooledExchange) answer;
-            ee.reset(System.currentTimeMillis());
         }
 
+        // reset exchange for reuse
+        PooledExchange ee = (PooledExchange) answer;
+        ee.reset(System.currentTimeMillis());
+
         ExchangeHelper.copyResults(answer, exchange);
         return answer;
     }
@@ -95,11 +96,12 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
             if (statisticsEnabled) {
                 statistics.acquired.increment();
             }
-            // reset exchange for reuse
-            PooledExchange ee = (PooledExchange) answer;
-            ee.reset(System.currentTimeMillis());
         }
 
+        // reset exchange for reuse
+        PooledExchange ee = (PooledExchange) answer;
+        ee.reset(System.currentTimeMillis());
+
         ExchangeHelper.copyResults(answer, exchange);
         // do not reuse message id on copy
         answer.getIn().setMessageId(null);
@@ -125,19 +127,21 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
             if (statisticsEnabled) {
                 statistics.acquired.increment();
             }
-            // reset exchange for reuse
-            PooledExchange ee = (PooledExchange) answer;
-            ee.reset(System.currentTimeMillis());
         }
+
+        // reset exchange for reuse
+        PooledExchange ee = (PooledExchange) answer;
+        ee.reset(System.currentTimeMillis());
+
         return answer;
     }
 
     @Override
     public boolean release(Exchange exchange) {
         try {
-            // done exchange before returning back to pool
+            // done exchange before returning to pool
             PooledExchange ee = (PooledExchange) exchange;
-            ee.done(true);
+            ee.done();
 
             // only release back in pool if reset was success
             boolean inserted = pool.offer(exchange);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 8bbf6e3aefc..d3d97b3fa8a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -144,7 +144,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
         if (exchange != null) {
             if (!autoRelease && exchange instanceof PooledExchange) {
                 // if not auto release we must manually force done
-                ((PooledExchange) exchange).done(true);
+                ((PooledExchange) exchange).done();
             }
             exchangeFactory.release(exchange);
         }
@@ -263,11 +263,13 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
 
         private final DefaultConsumer consumer;
         private final Exchange exchange;
+        private final boolean pooled;
         private final boolean autoRelease;
 
         public DefaultConsumerCallback(DefaultConsumer consumer, Exchange exchange, boolean autoRelease) {
             this.consumer = consumer;
             this.exchange = exchange;
+            this.pooled = exchange instanceof PooledExchange;
             this.autoRelease = autoRelease;
         }
 
@@ -280,7 +282,10 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
                             exchange.getException());
                 }
             } finally {
-                consumer.releaseExchange(exchange, autoRelease);
+                if (!autoRelease) {
+                    // must release if not auto released
+                    consumer.releaseExchange(exchange, autoRelease);
+                }
             }
         }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index a8aa4574b67..41b53eb0e83 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -31,7 +31,7 @@ import org.apache.camel.PooledExchange;
 public final class DefaultPooledExchange extends AbstractExchange implements PooledExchange {
 
     private OnDoneTask onDone;
-    private Class originalInClassType;
+    private Class<?> originalInClassType;
     private Message originalOut;
     private final ExchangePattern originalPattern;
     private boolean autoRelease;
@@ -79,8 +79,8 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
         this.onDone = task;
     }
 
-    public void done(boolean forced) {
-        if (created > 0 && (forced || autoRelease)) {
+    public void done() {
+        if (created > 0) {
             this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again
             this.properties.clear();
             // reset array by copying over from empty which is a very fast JVM optimized operation