You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/09 14:25:07 UTC
[camel] 01/01: CAMEL-16319: camel-core - Optimize consumer default
done callback to reduce object allocations.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch opt-consumer
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e9cfc8ba8ae3fa6b5badcd46f8ecbe170d9d2da3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Mar 9 15:05:11 2021 +0100
CAMEL-16319: camel-core - Optimize consumer default done callback to reduce object allocations.
---
.../camel/component/timer/TimerConsumer.java | 14 +++---------
.../src/main/java/org/apache/camel/Consumer.java | 15 +++++++++++++
.../org/apache/camel/support/DefaultConsumer.java | 25 ++++++++++++++++++++++
3 files changed, 43 insertions(+), 11 deletions(-)
diff --git a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 7c201bd..74479e6 100644
--- a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -204,17 +204,9 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S
}
if (!endpoint.isSynchronous()) {
- getAsyncProcessor().process(exchange, new AsyncCallback() {
- @Override
- public void done(boolean cbDoneSync) {
- // handle any thrown exception
- if (exchange.getException() != null) {
- TimerConsumer.this.getExceptionHandler().handleException("Error processing exchange", exchange,
- exchange.getException());
- }
- TimerConsumer.this.releaseExchange(exchange, false);
- }
- });
+ // use default consumer callback
+ AsyncCallback cb = defaultConsumerCallback(exchange, false);
+ getAsyncProcessor().process(exchange, cb);
} else {
try {
getProcessor().process(exchange);
diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
index 25c24f6..b1a6ba6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -47,8 +47,23 @@ public interface Consumer extends Service, EndpointAware {
/**
* Releases the {@link Exchange} when its completed processing and no longer needed.
*
+ * @param exchange the exchange
* @param autoRelease whether the exchange was created with auto release
*/
void releaseExchange(Exchange exchange, boolean autoRelease);
+ /**
+ * The default callback to use with the consumer when calling the processor using asynchronous routing.
+ *
+ * This implementation will use {@link org.apache.camel.spi.ExceptionHandler} to handle any exception on the
+ * exchange and afterwards release the exchange.
+ *
+ * @param exchange the exchange
+ * @param autoRelease whether the exchange was created with auto release
+ * @return the default callback
+ */
+ default AsyncCallback defaultConsumerCallback(Exchange exchange, boolean autoRelease) {
+ return null;
+ }
+
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 67f838d..762a9c5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.support;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
@@ -48,6 +51,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
private final Processor processor;
private final AsyncProcessor asyncProcessor;
private final ExchangeFactory exchangeFactory;
+ private final AtomicReference<AsyncCallback> pooledCallback = new AtomicReference<>();
private ExceptionHandler exceptionHandler;
private Route route;
private String routeId;
@@ -145,6 +149,26 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
}
@Override
+ public AsyncCallback defaultConsumerCallback(Exchange exchange, boolean autoRelease) {
+ boolean pooled = exchangeFactory.isPooled();
+ AsyncCallback answer = pooled ? pooledCallback.get() : null;
+ if (answer == null) {
+ answer = doneSync -> {
+ // handle any thrown exception
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange,
+ exchange.getException());
+ }
+ releaseExchange(exchange, autoRelease);
+ };
+ if (pooled) {
+ pooledCallback.set(answer);
+ }
+ }
+ return answer;
+ }
+
+ @Override
public Endpoint getEndpoint() {
return endpoint;
}
@@ -199,6 +223,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
protected void doShutdown() throws Exception {
LOG.debug("Shutting down consumer: {}", this);
ServiceHelper.stopAndShutdownServices(exchangeFactory, processor);
+ pooledCallback.set(null);
}
/**