You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/20 16:19:00 UTC

[camel] 04/05: CAMEL-16222: PooledExchangeFactory experiment

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f6d0603c6098bc210ef13ff1befc408b04eb40e6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 17:07:51 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/spi/ExchangeFactory.java | 11 ++++
 .../camel/impl/engine/DefaultExchangeFactory.java  | 18 ++++++-
 .../camel/impl/engine/PooledExchangeFactory.java   | 63 ++++++++++------------
 3 files changed, 55 insertions(+), 37 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 9bc469f..197d473 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -72,4 +72,15 @@ public interface ExchangeFactory {
     default boolean release(Exchange exchange) {
         return true;
     }
+
+    /**
+     * Whether statistics is enabled.
+     */
+    boolean isStatisticsEnabled();
+
+    /**
+     * Whether statistics is enabled.
+     */
+    void setStatisticsEnabled(boolean statisticsEnabled);
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index a8db865..469fb7c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,14 +16,18 @@
  */
 package org.apache.camel.impl.engine;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 
 /**
  * Default {@link ExchangeFactory} that creates a new {@link Exchange} instance.
  */
-public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
+public final class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
 
     private CamelContext camelContext;
 
@@ -53,4 +57,14 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar
         return new DefaultExchange(fromEndpoint);
     }
 
+    @Override
+    public boolean isStatisticsEnabled() {
+        return false;
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        // not in use
+    }
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index c5b51b4..4b228eb0 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,10 +19,17 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.PooledExchange;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultPooledExchange;
-import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
@@ -32,7 +39,7 @@ import org.slf4j.LoggerFactory;
  * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
  */
 @Experimental
-public class PooledExchangeFactory extends ServiceSupport
+public final class PooledExchangeFactory extends ServiceSupport
         implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
 
     // TODO: optimize onDone lambdas as they will be created per instance, and we can use static linked
@@ -40,7 +47,6 @@ public class PooledExchangeFactory extends ServiceSupport
     private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
 
     private final Consumer consumer;
-    private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
     private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
     private final AtomicLong acquired = new AtomicLong();
     private final AtomicLong created = new AtomicLong();
@@ -48,7 +54,7 @@ public class PooledExchangeFactory extends ServiceSupport
     private final AtomicLong discarded = new AtomicLong();
 
     private CamelContext camelContext;
-    private boolean statisticsEnabled = true;
+    private boolean statisticsEnabled;
 
     public PooledExchangeFactory() {
         this.consumer = null;
@@ -91,13 +97,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            PooledExchange answer = new DefaultPooledExchange(camelContext);
-            answer.setAutoRelease(autoRelease);
-            if (autoRelease) {
-                // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
-                answer.onDone(this::release);
-            }
-            return answer;
+            exchange = createPooledExchange(null, autoRelease);
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
@@ -117,13 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            PooledExchange answer = new DefaultPooledExchange(fromEndpoint);
-            answer.setAutoRelease(autoRelease);
-            if (autoRelease) {
-                // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
-                answer.onDone(this::release);
-            }
-            return answer;
+            exchange = new DefaultPooledExchange(fromEndpoint);
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
@@ -158,6 +152,21 @@ public class PooledExchangeFactory extends ServiceSupport
         }
     }
 
+    protected PooledExchange createPooledExchange(Endpoint fromEndpoint, boolean autoRelease) {
+        PooledExchange answer = null;
+        if (fromEndpoint != null) {
+            answer = new DefaultPooledExchange(fromEndpoint);
+        } else {
+            answer = new DefaultPooledExchange(camelContext);
+        }
+        answer.setAutoRelease(autoRelease);
+        if (autoRelease) {
+            // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
+            answer.onDone(this::release);
+        }
+        return answer;
+    }
+
     @Override
     protected void doStop() throws Exception {
         pool.clear();
@@ -180,20 +189,4 @@ public class PooledExchangeFactory extends ServiceSupport
         discarded.set(0);
     }
 
-    private final class ReleaseOnCompletion extends SynchronizationAdapter {
-
-        @Override
-        public int getOrder() {
-            // should be very very last so set as highest value possible
-            return Integer.MAX_VALUE;
-        }
-
-        @Override
-        public void onDone(Exchange exchange) {
-            if (exchange != null) {
-                release(exchange);
-            }
-        }
-    }
-
 }