You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 05:38:07 UTC
[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5c336a91e9ad8f827a17ca99de3e59aa240ab770
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 18 06:36:52 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../src/main/java/org/apache/camel/Ordered.java | 2 +-
.../camel/impl/engine/PooledExchangeFactory.java | 70 +++++++++++++++++++++-
.../camel/impl/engine/SimpleCamelContext.java | 4 +-
3 files changed, 73 insertions(+), 3 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/Ordered.java b/core/camel-api/src/main/java/org/apache/camel/Ordered.java
index beb50a5..df5cddec 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Ordered.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Ordered.java
@@ -29,7 +29,7 @@ public interface Ordered {
/**
* The lowest precedence
*/
- int LOWEST = Integer.MAX_VALUE;
+ int LOWEST = Integer.MAX_VALUE - 1000; // reserve for internal use
/**
* Gets the order.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index ced186e..88142c9 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -17,11 +17,22 @@
package org.apache.camel.impl.engine;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.StaticService;
import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.service.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
@@ -30,9 +41,16 @@ import org.apache.camel.support.service.ServiceSupport;
public class PooledExchangeFactory extends ServiceSupport
implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
+ private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
+
+ private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
+ private final AtomicLong acquired = new AtomicLong();
+ private final AtomicLong created = new AtomicLong();
+ private final AtomicLong released = new AtomicLong();
private CamelContext camelContext;
+ private boolean statisticsEnabled = true;
@Override
public CamelContext getCamelContext() {
@@ -44,17 +62,33 @@ public class PooledExchangeFactory extends ServiceSupport
this.camelContext = camelContext;
}
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+
@Override
public Exchange create() {
Exchange exchange = pool.poll();
if (exchange == null) {
+ if (statisticsEnabled) {
+ created.incrementAndGet();
+ }
// create a new exchange as there was no free from the pool
exchange = new DefaultExchange(camelContext);
} else {
+ if (statisticsEnabled) {
+ acquired.incrementAndGet();
+ }
// reset exchange before we use it
ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
ee.reset();
}
+ // add on completion which will return the exchange when done
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
return exchange;
}
@@ -62,23 +96,57 @@ public class PooledExchangeFactory extends ServiceSupport
public Exchange create(Endpoint fromEndpoint) {
Exchange exchange = pool.poll();
if (exchange == null) {
+ if (statisticsEnabled) {
+ created.incrementAndGet();
+ }
// create a new exchange as there was no free from the pool
exchange = new DefaultExchange(fromEndpoint);
} else {
+ if (statisticsEnabled) {
+ acquired.incrementAndGet();
+ }
// need to mark this exchange from the given endpoint
exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
}
+ // add on completion which will return the exchange when done
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
return exchange;
}
@Override
public void release(Exchange exchange) {
+ if (statisticsEnabled) {
+ released.incrementAndGet();
+ }
pool.offer(exchange);
}
@Override
protected void doStop() throws Exception {
pool.clear();
+
+ if (statisticsEnabled) {
+ LOG.info("PooledExchangeFactory usage [created: {}, acquired: {}, released: {}]", created.get(), acquired.get(),
+ released.get());
+ }
+
+ created.set(0);
+ acquired.set(0);
+ released.set(0);
+ }
+
+ private final class ReleaseOnCompletion extends SynchronizationAdapter {
+
+ @Override
+ public int getOrder() {
+ // should be very very last so set as highest value possible
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void onDone(Exchange exchange) {
+ release(exchange);
+ }
}
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index b91710c..e373e43 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -551,7 +551,9 @@ public class SimpleCamelContext extends AbstractCamelContext {
ExchangeFactory.FACTORY,
ExchangeFactory.class);
- return result.orElseGet(DefaultExchangeFactory::new);
+ // TODO: experiment
+ // return result.orElseGet(DefaultExchangeFactory::new);
+ return result.orElseGet(PooledExchangeFactory::new);
}
@Override