You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 05:38:07 UTC

[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5c336a91e9ad8f827a17ca99de3e59aa240ab770
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 18 06:36:52 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../src/main/java/org/apache/camel/Ordered.java    |  2 +-
 .../camel/impl/engine/PooledExchangeFactory.java   | 70 +++++++++++++++++++++-
 .../camel/impl/engine/SimpleCamelContext.java      |  4 +-
 3 files changed, 73 insertions(+), 3 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Ordered.java b/core/camel-api/src/main/java/org/apache/camel/Ordered.java
index beb50a5..df5cddec 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Ordered.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Ordered.java
@@ -29,7 +29,7 @@ public interface Ordered {
     /**
      * The lowest precedence
      */
-    int LOWEST = Integer.MAX_VALUE;
+    int LOWEST = Integer.MAX_VALUE - 1000; // reserve for internal use
 
     /**
      * Gets the order.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index ced186e..88142c9 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -17,11 +17,22 @@
 package org.apache.camel.impl.engine;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
@@ -30,9 +41,16 @@ import org.apache.camel.support.service.ServiceSupport;
 public class PooledExchangeFactory extends ServiceSupport
         implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
 
+    private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
+
+    private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
     private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
+    private final AtomicLong acquired = new AtomicLong();
+    private final AtomicLong created = new AtomicLong();
+    private final AtomicLong released = new AtomicLong();
 
     private CamelContext camelContext;
+    private boolean statisticsEnabled = true;
 
     @Override
     public CamelContext getCamelContext() {
@@ -44,17 +62,33 @@ public class PooledExchangeFactory extends ServiceSupport
         this.camelContext = camelContext;
     }
 
+    public boolean isStatisticsEnabled() {
+        return statisticsEnabled;
+    }
+
+    public void setStatisticsEnabled(boolean statisticsEnabled) {
+        this.statisticsEnabled = statisticsEnabled;
+    }
+
     @Override
     public Exchange create() {
         Exchange exchange = pool.poll();
         if (exchange == null) {
+            if (statisticsEnabled) {
+                created.incrementAndGet();
+            }
             // create a new exchange as there was no free from the pool
             exchange = new DefaultExchange(camelContext);
         } else {
+            if (statisticsEnabled) {
+                acquired.incrementAndGet();
+            }
             // reset exchange before we use it
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
             ee.reset();
         }
+        // add on completion which will return the exchange when done
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         return exchange;
     }
 
@@ -62,23 +96,57 @@ public class PooledExchangeFactory extends ServiceSupport
     public Exchange create(Endpoint fromEndpoint) {
         Exchange exchange = pool.poll();
         if (exchange == null) {
+            if (statisticsEnabled) {
+                created.incrementAndGet();
+            }
             // create a new exchange as there was no free from the pool
             exchange = new DefaultExchange(fromEndpoint);
         } else {
+            if (statisticsEnabled) {
+                acquired.incrementAndGet();
+            }
             // need to mark this exchange from the given endpoint
             exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
         }
+        // add on completion which will return the exchange when done
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         return exchange;
     }
 
     @Override
     public void release(Exchange exchange) {
+        if (statisticsEnabled) {
+            released.incrementAndGet();
+        }
         pool.offer(exchange);
     }
 
     @Override
     protected void doStop() throws Exception {
         pool.clear();
+
+        if (statisticsEnabled) {
+            LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}]", created.get(), acquired.get(),
+                    released.get());
+        }
+
+        created.set(0);
+        acquired.set(0);
+        released.set(0);
+    }
+
+    private final class ReleaseOnCompletion extends SynchronizationAdapter {
+
+        @Override
+        public int getOrder() {
+            // should be very very last so set as highest value possible
+            return Integer.MAX_VALUE;
+        }
+
+        @Override
+        public void onDone(Exchange exchange) {
+            release(exchange);
+        }
     }
 
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index b91710c..e373e43 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -551,7 +551,9 @@ public class SimpleCamelContext extends AbstractCamelContext {
                 ExchangeFactory.FACTORY,
                 ExchangeFactory.class);
 
-        return result.orElseGet(DefaultExchangeFactory::new);
+        // TODO: experiment
+        //        return result.orElseGet(DefaultExchangeFactory::new);
+        return result.orElseGet(PooledExchangeFactory::new);
     }
 
     @Override