You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/07 16:55:56 UTC
[camel] 01/01: CAMEL-16279: camel-core - Optimize core to reduce
object allocations by pooloing reusable tasks in the routing engine.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch pool
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 337b9c579a73cfdae52721f2f04f76ea616681a8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Mar 7 17:54:51 2021 +0100
CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine.
---
.../java/org/apache/camel/spi/ExchangeFactory.java | 86 +-----------
.../apache/camel/spi/InternalProcessorFactory.java | 4 +-
.../org/apache/camel/spi/PooledObjectFactory.java | 125 +++++++++++++++++
.../camel/impl/engine/AbstractCamelContext.java | 3 +
.../camel/impl/engine/PooledExchangeFactory.java | 5 +
.../impl/engine/PrototypeExchangeFactory.java | 111 ++-------------
.../apache/camel/processor/PooledExchangeTask.java | 41 ++++++
.../camel/processor/PooledExchangeTaskFactory.java | 56 ++++++++
.../apache/camel/processor/PooledTaskFactory.java | 64 +++++++++
.../camel/processor/PrototypeTaskFactory.java | 48 +++++++
.../errorhandler/RedeliveryErrorHandler.java | 137 ++++++++++++++-----
.../camel/support/PooledObjectFactorySupport.java | 152 +++++++++++++++++++++
.../support/PrototypeObjectFactorySupport.java | 139 +++++++++++++++++++
13 files changed, 756 insertions(+), 215 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index fcedd54..29b3fdb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,12 +16,10 @@
*/
package org.apache.camel.spi;
-import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.NonManagedService;
-import org.apache.camel.Service;
/**
* Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the
@@ -36,50 +34,7 @@ import org.apache.camel.Service;
* The factory is pluggable which allows to use different strategies. The default factory will create a new
* {@link Exchange} instance, and the pooled factory will pool and reuse exchanges.
*/
-public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService, RouteIdAware {
-
- /**
- * Utilization statistics of the this factory.
- */
- interface Statistics {
-
- /**
- * Number of new exchanges created.
- */
- long getCreatedCounter();
-
- /**
- * Number of exchanges acquired (reused) when using pooled factory.
- */
- long getAcquiredCounter();
-
- /**
- * Number of exchanges released back to pool
- */
- long getReleasedCounter();
-
- /**
- * Number of exchanges discarded (thrown away) such as if no space in cache pool.
- */
- long getDiscardedCounter();
-
- /**
- * Reset the counters
- */
- void reset();
-
- /**
- * Whether statistics is enabled.
- */
- boolean isStatisticsEnabled();
-
- /**
- * Sets whether statistics is enabled.
- *
- * @param statisticsEnabled <tt>true</tt> to enable
- */
- void setStatisticsEnabled(boolean statisticsEnabled);
- }
+public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware {
/**
* Service factory key.
@@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service, CamelContextAware, NonManagedS
}
/**
- * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
- */
- int getCapacity();
-
- /**
- * The current number of exchanges in the pool
- */
- int getSize();
-
- /**
- * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.
- */
- void setCapacity(int capacity);
-
- /**
- * Whether statistics is enabled.
- */
- boolean isStatisticsEnabled();
-
- /**
- * Whether statistics is enabled.
- */
- void setStatisticsEnabled(boolean statisticsEnabled);
-
- /**
- * Reset the statistics
- */
- void resetStatistics();
-
- /**
- * Purges the internal cache (if pooled)
- */
- void purge();
-
- /**
- * Gets the usage statistics
+ * Whether the factory is pooled.
*/
- Statistics getStatistics();
+ boolean isPooled();
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index b3f8868..8a56f21 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -27,7 +27,9 @@ import org.apache.camel.Route;
/**
* A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is
- * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}.
+ * used to have loose coupling between the modules in core.
+ *
+ * Camel end user should NOT use this, but use {@link ProcessorFactory} instead.
*
* @see ProcessorFactory
*/
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
new file mode 100644
index 0000000..db4c0d1
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
+/**
+ * Factory for pooled objects or tasks.
+ */
+public interface PooledObjectFactory<T> extends Service, CamelContextAware {
+
+ /**
+ * Utilization statistics of the this factory.
+ */
+ interface Statistics {
+
+ /**
+ * Number of new exchanges created.
+ */
+ long getCreatedCounter();
+
+ /**
+ * Number of exchanges acquired (reused) when using pooled factory.
+ */
+ long getAcquiredCounter();
+
+ /**
+ * Number of exchanges released back to pool
+ */
+ long getReleasedCounter();
+
+ /**
+ * Number of exchanges discarded (thrown away) such as if no space in cache pool.
+ */
+ long getDiscardedCounter();
+
+ /**
+ * Reset the counters
+ */
+ void reset();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Sets whether statistics is enabled.
+ *
+ * @param statisticsEnabled <tt>true</tt> to enable
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+ }
+
+ /**
+ * The current number of objects in the pool
+ */
+ int getSize();
+
+ /**
+ * The capacity the pool uses for storing objects. The default capacity is 100.
+ */
+ int getCapacity();
+
+ /**
+ * The capacity the pool uses for storing objects. The default capacity is 100.
+ */
+ void setCapacity(int capacity);
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
+ /**
+ * Reset the statistics
+ */
+ void resetStatistics();
+
+ /**
+ * Purges the internal cache (if pooled)
+ */
+ void purge();
+
+ /**
+ * Gets the usage statistics
+ */
+ Statistics getStatistics();
+
+ /**
+ * Acquires an object from the pool (if any)
+ *
+ * @return the object or <tt>null</tt> if the pool is empty
+ */
+ T acquire();
+
+ /**
+ * Releases the object back to the pool
+ *
+ * @param t the object
+ * @return true if released into the pool, or false if something went wrong and the object was discarded
+ */
+ boolean release(T t);
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d7e6dd4..45fcf6d 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends BaseService
}
bootstraps.clear();
+ if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled()) {
+ LOG.info("Pooled mode enabled. Camel pools and reuses objects to reduce JVM object allocations.");
+ }
if (isLightweight()) {
LOG.info("Lightweight mode enabled. Performing optimizations and memory reduction.");
ReifierStrategy.clearReifiers();
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 4079a46..b1647b5 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -168,6 +168,11 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory {
}
@Override
+ public boolean isPooled() {
+ return true;
+ }
+
+ @Override
protected void doStop() throws Exception {
exchangeFactoryManager.removeExchangeFactory(this);
logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
index fb17cb0..eef1ea7 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.impl.engine;
-import java.util.concurrent.atomic.LongAdder;
-
-import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.spi.ExchangeFactoryManager;
import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.PooledObjectFactorySupport;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory;
/**
* {@link ExchangeFactory} that creates a new {@link Exchange} instance.
*/
-public class PrototypeExchangeFactory extends ServiceSupport implements ExchangeFactory {
+public class PrototypeExchangeFactory extends PooledObjectFactorySupport<Exchange> implements ExchangeFactory {
private static final Logger LOG = LoggerFactory.getLogger(PrototypeExchangeFactory.class);
- final UtilizationStatistics statistics = new UtilizationStatistics();
final Consumer consumer;
- CamelContext camelContext;
ExchangeFactoryManager exchangeFactoryManager;
String routeId;
@@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
@Override
protected void doBuild() throws Exception {
+ super.doBuild();
this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
}
@@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
}
@Override
- public CamelContext getCamelContext() {
- return camelContext;
- }
-
- @Override
- public void setCamelContext(CamelContext camelContext) {
- this.camelContext = camelContext;
- }
-
- @Override
public ExchangeFactory newExchangeFactory(Consumer consumer) {
PrototypeExchangeFactory answer = new PrototypeExchangeFactory(consumer);
answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
@@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
}
@Override
+ public Exchange acquire() {
+ throw new UnsupportedOperationException("Not in use");
+ }
+
+ @Override
public Exchange create(boolean autoRelease) {
if (statistics.isStatisticsEnabled()) {
statistics.created.increment();
@@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
}
@Override
- public boolean isStatisticsEnabled() {
- return statistics.isStatisticsEnabled();
- }
-
- @Override
- public void setStatisticsEnabled(boolean statisticsEnabled) {
- statistics.setStatisticsEnabled(statisticsEnabled);
- }
-
- @Override
- public int getCapacity() {
- return 0;
- }
-
- @Override
- public int getSize() {
- return 0;
- }
-
- @Override
- public void setCapacity(int capacity) {
- // not in use
- }
-
- @Override
public void resetStatistics() {
statistics.reset();
}
@Override
- public void purge() {
- // not in use
- }
-
- @Override
- public Statistics getStatistics() {
- return statistics;
+ public boolean isPooled() {
+ return false;
}
@Override
protected void doStart() throws Exception {
+ super.doStart();
if (exchangeFactoryManager != null) {
exchangeFactoryManager.addExchangeFactory(this);
}
@@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
@Override
protected void doStop() throws Exception {
+ super.doStop();
if (exchangeFactoryManager != null) {
exchangeFactoryManager.removeExchangeFactory(this);
}
@@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange
}
}
- /**
- * Represents utilization statistics
- */
- final class UtilizationStatistics implements ExchangeFactory.Statistics {
-
- boolean statisticsEnabled;
- final LongAdder created = new LongAdder();
- final LongAdder acquired = new LongAdder();
- final LongAdder released = new LongAdder();
- final LongAdder discarded = new LongAdder();
-
- @Override
- public void reset() {
- created.reset();
- acquired.reset();
- released.reset();
- discarded.reset();
- }
-
- @Override
- public long getCreatedCounter() {
- return created.longValue();
- }
-
- @Override
- public long getAcquiredCounter() {
- return acquired.longValue();
- }
-
- @Override
- public long getReleasedCounter() {
- return released.longValue();
- }
-
- @Override
- public long getDiscardedCounter() {
- return discarded.longValue();
- }
-
- @Override
- public boolean isStatisticsEnabled() {
- return statisticsEnabled;
- }
-
- @Override
- public void setStatisticsEnabled(boolean statisticsEnabled) {
- this.statisticsEnabled = statisticsEnabled;
- }
- }
-
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
new file mode 100644
index 0000000..d4e0226
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * A task that EIPs and internal routing engine uses to store state when processing an {@link Exchange}.
+ *
+ * @see org.apache.camel.processor.PooledExchangeTaskFactory
+ */
+public interface PooledExchangeTask extends Runnable {
+
+ /**
+ * Prepares the task for the given exchange and its callback
+ *
+ * @param exchange the exchange
+ * @param callback the callback
+ */
+ void prepare(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Resets the task after its done and can be reused for another exchange.
+ */
+ void reset();
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
new file mode 100644
index 0000000..b90e9f5
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.PooledObjectFactory;
+
+/**
+ * Factory to create {@link PooledExchangeTask}.
+ *
+ * @see PooledExchangeTask
+ */
+public interface PooledExchangeTaskFactory extends PooledObjectFactory<PooledExchangeTask> {
+
+ /**
+ * Creates a new task to use for processing the exchange.
+ *
+ * @param exchange the current exchange
+ * @param callback the callback for the exchange
+ * @return the task
+ */
+ PooledExchangeTask create(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Attempts to acquire a pooled task to use for processing the exchange, if not possible then a new task is created.
+ *
+ * @param exchange the current exchange
+ * @param callback the callback for the exchange
+ * @return the task
+ */
+ PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Releases the task after its done being used
+ *
+ * @param task the task
+ * @return true if the task was released, and false if the task failed to be released or no space in pool, and
+ * the task was discarded.
+ */
+ boolean release(PooledExchangeTask task);
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
new file mode 100644
index 0000000..c1ce72e
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+public abstract class PooledTaskFactory extends PooledObjectFactorySupport<PooledExchangeTask>
+ implements PooledExchangeTaskFactory {
+
+ @Override
+ public PooledExchangeTask acquire() {
+ return pool.poll();
+ }
+
+ public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) {
+ PooledExchangeTask task = acquire();
+ if (task == null) {
+ if (statistics.isStatisticsEnabled()) {
+ statistics.created.increment();
+ }
+ task = create(exchange, callback);
+ } else {
+ if (statistics.isStatisticsEnabled()) {
+ statistics.acquired.increment();
+ }
+ }
+ task.prepare(exchange, callback);
+ return task;
+ }
+
+ @Override
+ public boolean release(PooledExchangeTask task) {
+ boolean inserted = pool.offer(task);
+ if (statistics.isStatisticsEnabled()) {
+ if (inserted) {
+ statistics.released.increment();
+ } else {
+ statistics.discarded.increment();
+ }
+ }
+ return inserted;
+ }
+
+ @Override
+ public String toString() {
+ return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+ }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
new file mode 100644
index 0000000..0c5c444
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PrototypeObjectFactorySupport;
+
+public abstract class PrototypeTaskFactory extends PrototypeObjectFactorySupport<PooledExchangeTask>
+ implements PooledExchangeTaskFactory {
+
+ @Override
+ public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) {
+ PooledExchangeTask task = create(exchange, callback);
+ task.prepare(exchange, callback);
+ return task;
+ }
+
+ @Override
+ public PooledExchangeTask acquire() {
+ throw new UnsupportedOperationException("Not in use");
+ }
+
+ @Override
+ public boolean release(PooledExchangeTask pooledTask) {
+ // not pooled
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PrototypeTaskFactory";
+ }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 3c0eaf3..39d3658 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -38,6 +38,10 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
@@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class);
+ // factory
+ protected PooledExchangeTaskFactory taskFactory;
+
// state
protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
protected ScheduledExecutorService executorService;
@@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
// Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not)
- Runnable task;
- if (simpleTask) {
- task = new SimpleTask(exchange, callback);
- } else {
- task = new RedeliveryTask(exchange, callback);
- }
+ Runnable task = taskFactory.acquire(exchange, callback);
+
// Run it
if (exchange.isTransacted()) {
reactiveExecutor.scheduleSync(task);
@@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
/**
* Simple task to perform calling the processor with no redelivery support
*/
- protected class SimpleTask implements Runnable, AsyncCallback {
- private final ExtendedExchange exchange;
- private final AsyncCallback callback;
- private boolean first = true;
+ protected class SimpleTask implements PooledExchangeTask, Runnable, AsyncCallback {
+ private ExtendedExchange exchange;
+ private AsyncCallback callback;
+ private boolean first;
- SimpleTask(Exchange exchange, AsyncCallback callback) {
+ public SimpleTask() {
+ }
+
+ public void prepare(Exchange exchange, AsyncCallback callback) {
this.exchange = (ExtendedExchange) exchange;
this.callback = callback;
+ this.first = true;
}
@Override
@@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
return "SimpleTask";
}
+ public void reset() {
+ this.exchange = null;
+ this.callback = null;
+ this.first = true;
+ }
+
@Override
public void done(boolean doneSync) {
// the run method decides what to do when we are done
@@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
return;
}
if (exchange.isInterrupted()) {
@@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
}
exchange.setRouteStop(true);
// we should not continue routing so call callback
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
return;
}
@@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
onExceptionOccurred();
prepareExchangeAfterFailure(exchange);
// we do not support redelivery so continue callback
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
} else if (first) {
// first time call the target processor
first = false;
outputAsync.process(exchange, this);
} else {
// we are done so continue callback
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
}
}
@@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
/**
* Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask)
*/
- protected class RedeliveryTask implements Runnable {
- private final Exchange original;
- private final ExtendedExchange exchange;
- private final AsyncCallback callback;
+ protected class RedeliveryTask implements PooledExchangeTask, Runnable {
+ // state
+ private Exchange original;
+ private ExtendedExchange exchange;
+ private AsyncCallback callback;
private int redeliveryCounter;
private long redeliveryDelay;
- private Predicate retryWhilePredicate;
// default behavior which can be overloaded on a per exception basis
+ private Predicate retryWhilePredicate;
private RedeliveryPolicy currentRedeliveryPolicy;
private Processor failureProcessor;
private Processor onRedeliveryProcessor;
@@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
private boolean useOriginalInMessage;
private boolean useOriginalInBody;
- public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
+ public RedeliveryTask() {
+ }
+
+ @Override
+ public String toString() {
+ return "RedeliveryTask";
+ }
+
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
this.retryWhilePredicate = retryWhilePolicy;
this.currentRedeliveryPolicy = redeliveryPolicy;
this.handledPredicate = getDefaultHandledPredicate();
@@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
this.useOriginalInBody = useOriginalBodyPolicy;
this.onRedeliveryProcessor = redeliveryProcessor;
this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
-
// do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
// original Exchange is being redelivered, and not a mutated Exchange
this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null;
@@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
}
@Override
- public String toString() {
- return "RedeliveryTask";
+ public void reset() {
+ this.retryWhilePredicate = null;
+ this.currentRedeliveryPolicy = null;
+ this.handledPredicate = null;
+ this.continuedPredicate = null;
+ this.useOriginalInMessage = false;
+ this.useOriginalInBody = false;
+ this.onRedeliveryProcessor = null;
+ this.onExceptionProcessor = null;
+ this.original = null;
+ this.exchange = null;
+ this.callback = null;
+ this.redeliveryCounter = 0;
+ this.redeliveryDelay = 0;
}
/**
@@ -635,7 +677,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
return;
}
@@ -644,7 +688,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
} catch (Throwable e) {
// unexpected exception during running so break out
exchange.setException(e);
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
}
}
@@ -804,7 +850,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
if (isDone(exchange)) {
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
} else {
// error occurred so loop back around which we do by invoking the processAsyncErrorHandler
reactiveExecutor.schedule(this);
@@ -1107,7 +1155,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
}
} finally {
// if the fault was handled asynchronously, this should be reflected in the callback as well
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
}
});
} else {
@@ -1126,7 +1176,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
prepareExchangeAfterFailure(exchange, isDeadLetterChannel, shouldHandle, shouldContinue);
} finally {
// callback we are done
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
}
}
@@ -1503,8 +1555,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
@Override
protected void doStart() throws Exception {
- ServiceHelper.startService(output, outputAsync, deadLetter);
-
// determine if redeliver is enabled or not
redeliveryEnabled = determineIfRedeliveryIsEnabled();
if (LOG.isTraceEnabled()) {
@@ -1531,6 +1581,28 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
// however if we dont then its less memory overhead (and a bit less cpu) of using the simple task
simpleTask = deadLetter == null && !redeliveryEnabled && (exceptionPolicies == null || exceptionPolicies.isEmpty())
&& onPrepareProcessor == null;
+
+ boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+ if (pooled) {
+ taskFactory = new PooledTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return simpleTask ? new SimpleTask() : new RedeliveryTask();
+ }
+ };
+ int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+ taskFactory.setCapacity(capacity);
+ } else {
+ taskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
+ return simpleTask ? new SimpleTask() : new RedeliveryTask();
+ }
+ };
+ }
+ LOG.trace("Using TaskFactory: {}", taskFactory);
+
+ ServiceHelper.startService(taskFactory, output, outputAsync, deadLetter);
}
@Override
@@ -1542,6 +1614,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
+ ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync, taskFactory);
}
+
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
new file mode 100644
index 0000000..91c56cd
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+public abstract class PooledObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> {
+
+ protected final UtilizationStatistics statistics = new UtilizationStatistics();
+
+ protected CamelContext camelContext;
+ protected BlockingQueue<T> pool;
+ protected int capacity = 100;
+
+ @Override
+ protected void doBuild() throws Exception {
+ super.doBuild();
+ this.pool = new ArrayBlockingQueue<>(capacity);
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statistics.isStatisticsEnabled();
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ statistics.setStatisticsEnabled(statisticsEnabled);
+ }
+
+ @Override
+ public int getSize() {
+ if (pool != null) {
+ return pool.size();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public int getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+ @Override
+ public void resetStatistics() {
+ statistics.reset();
+ }
+
+ @Override
+ public void purge() {
+ pool.clear();
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ statistics.reset();
+ pool.clear();
+ }
+
+ /**
+ * Represents utilization statistics
+ */
+ protected final class UtilizationStatistics implements PooledObjectFactory.Statistics {
+
+ boolean statisticsEnabled;
+ public final LongAdder created = new LongAdder();
+ public final LongAdder acquired = new LongAdder();
+ public final LongAdder released = new LongAdder();
+ public final LongAdder discarded = new LongAdder();
+
+ @Override
+ public void reset() {
+ created.reset();
+ acquired.reset();
+ released.reset();
+ discarded.reset();
+ }
+
+ @Override
+ public long getCreatedCounter() {
+ return created.longValue();
+ }
+
+ @Override
+ public long getAcquiredCounter() {
+ return acquired.longValue();
+ }
+
+ @Override
+ public long getReleasedCounter() {
+ return released.longValue();
+ }
+
+ @Override
+ public long getDiscardedCounter() {
+ return discarded.longValue();
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+ }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
new file mode 100644
index 0000000..5df6bbf
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new instance (does not pool).
+ */
+public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> {
+
+ protected final UtilizationStatistics statistics = new UtilizationStatistics();
+ private CamelContext camelContext;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statistics.isStatisticsEnabled();
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ statistics.setStatisticsEnabled(statisticsEnabled);
+ }
+
+ @Override
+ public int getSize() {
+ return 0;
+ }
+
+ @Override
+ public int getCapacity() {
+ return 0;
+ }
+
+ @Override
+ public void setCapacity(int capacity) {
+ // not in use
+ }
+
+ @Override
+ public void resetStatistics() {
+ statistics.reset();
+ }
+
+ @Override
+ public void purge() {
+ // not in use
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ statistics.reset();
+ }
+
+ /**
+ * Represents utilization statistics
+ */
+ protected final class UtilizationStatistics implements Statistics {
+
+ boolean statisticsEnabled;
+ public final LongAdder created = new LongAdder();
+ public final LongAdder acquired = new LongAdder();
+ public final LongAdder released = new LongAdder();
+ public final LongAdder discarded = new LongAdder();
+
+ @Override
+ public void reset() {
+ created.reset();
+ acquired.reset();
+ released.reset();
+ discarded.reset();
+ }
+
+ @Override
+ public long getCreatedCounter() {
+ return created.longValue();
+ }
+
+ @Override
+ public long getAcquiredCounter() {
+ return acquired.longValue();
+ }
+
+ @Override
+ public long getReleasedCounter() {
+ return released.longValue();
+ }
+
+ @Override
+ public long getDiscardedCounter() {
+ return discarded.longValue();
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+ }
+
+}