You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/20 16:19:00 UTC
[camel] 04/05: CAMEL-16222: PooledExchangeFactory experiment
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f6d0603c6098bc210ef13ff1befc408b04eb40e6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 20 17:07:51 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../java/org/apache/camel/spi/ExchangeFactory.java | 11 ++++
.../camel/impl/engine/DefaultExchangeFactory.java | 18 ++++++-
.../camel/impl/engine/PooledExchangeFactory.java | 63 ++++++++++------------
3 files changed, 55 insertions(+), 37 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 9bc469f..197d473 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -72,4 +72,15 @@ public interface ExchangeFactory {
default boolean release(Exchange exchange) {
return true;
}
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index a8db865..469fb7c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -16,14 +16,18 @@
*/
package org.apache.camel.impl.engine;
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.support.DefaultExchange;
/**
* Default {@link ExchangeFactory} that creates a new {@link Exchange} instance.
*/
-public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
+public final class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware {
private CamelContext camelContext;
@@ -53,4 +57,14 @@ public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAwar
return new DefaultExchange(fromEndpoint);
}
+ @Override
+ public boolean isStatisticsEnabled() {
+ return false;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ // not in use
+ }
+
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index c5b51b4..4b228eb0 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,10 +19,17 @@ package org.apache.camel.impl.engine;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.PooledExchange;
+import org.apache.camel.StaticService;
import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.support.DefaultPooledExchange;
-import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
@@ -32,7 +39,7 @@ import org.slf4j.LoggerFactory;
* Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a pool.
*/
@Experimental
-public class PooledExchangeFactory extends ServiceSupport
+public final class PooledExchangeFactory extends ServiceSupport
implements ExchangeFactory, CamelContextAware, StaticService, NonManagedService {
// TODO: optimize onDone lambdas as they will be created per instance, and we can use static linked
@@ -40,7 +47,6 @@ public class PooledExchangeFactory extends ServiceSupport
private static final Logger LOG = LoggerFactory.getLogger(PooledExchangeFactory.class);
private final Consumer consumer;
- private final ReleaseOnCompletion onCompletion = new ReleaseOnCompletion();
private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>();
private final AtomicLong acquired = new AtomicLong();
private final AtomicLong created = new AtomicLong();
@@ -48,7 +54,7 @@ public class PooledExchangeFactory extends ServiceSupport
private final AtomicLong discarded = new AtomicLong();
private CamelContext camelContext;
- private boolean statisticsEnabled = true;
+ private boolean statisticsEnabled;
public PooledExchangeFactory() {
this.consumer = null;
@@ -91,13 +97,7 @@ public class PooledExchangeFactory extends ServiceSupport
created.incrementAndGet();
}
// create a new exchange as there was no free from the pool
- PooledExchange answer = new DefaultPooledExchange(camelContext);
- answer.setAutoRelease(autoRelease);
- if (autoRelease) {
- // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
- answer.onDone(this::release);
- }
- return answer;
+ exchange = createPooledExchange(null, autoRelease);
} else {
if (statisticsEnabled) {
acquired.incrementAndGet();
@@ -117,13 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport
created.incrementAndGet();
}
// create a new exchange as there was no free from the pool
- PooledExchange answer = new DefaultPooledExchange(fromEndpoint);
- answer.setAutoRelease(autoRelease);
- if (autoRelease) {
- // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
- answer.onDone(this::release);
- }
- return answer;
+ exchange = new DefaultPooledExchange(fromEndpoint);
} else {
if (statisticsEnabled) {
acquired.incrementAndGet();
@@ -158,6 +152,21 @@ public class PooledExchangeFactory extends ServiceSupport
}
}
+ protected PooledExchange createPooledExchange(Endpoint fromEndpoint, boolean autoRelease) {
+ PooledExchange answer = null;
+ if (fromEndpoint != null) {
+ answer = new DefaultPooledExchange(fromEndpoint);
+ } else {
+ answer = new DefaultPooledExchange(camelContext);
+ }
+ answer.setAutoRelease(autoRelease);
+ if (autoRelease) {
+ // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created
+ answer.onDone(this::release);
+ }
+ return answer;
+ }
+
@Override
protected void doStop() throws Exception {
pool.clear();
@@ -180,20 +189,4 @@ public class PooledExchangeFactory extends ServiceSupport
discarded.set(0);
}
- private final class ReleaseOnCompletion extends SynchronizationAdapter {
-
- @Override
- public int getOrder() {
- // should be very very last so set as highest value possible
- return Integer.MAX_VALUE;
- }
-
- @Override
- public void onDone(Exchange exchange) {
- if (exchange != null) {
- release(exchange);
- }
- }
- }
-
}