You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/03/09 14:25:07 UTC

[camel] 01/01: CAMEL-16319: camel-core - Optimize consumer default done callback to reduce object allocations.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch opt-consumer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e9cfc8ba8ae3fa6b5badcd46f8ecbe170d9d2da3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Mar 9 15:05:11 2021 +0100

    CAMEL-16319: camel-core - Optimize consumer default done callback to reduce object allocations.
---
 .../camel/component/timer/TimerConsumer.java       | 14 +++---------
 .../src/main/java/org/apache/camel/Consumer.java   | 15 +++++++++++++
 .../org/apache/camel/support/DefaultConsumer.java  | 25 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 11 deletions(-)

diff --git a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 7c201bd..74479e6 100644
--- a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -204,17 +204,9 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener, S
         }
 
         if (!endpoint.isSynchronous()) {
-            getAsyncProcessor().process(exchange, new AsyncCallback() {
-                @Override
-                public void done(boolean cbDoneSync) {
-                    // handle any thrown exception
-                    if (exchange.getException() != null) {
-                        TimerConsumer.this.getExceptionHandler().handleException("Error processing exchange", exchange,
-                                exchange.getException());
-                    }
-                    TimerConsumer.this.releaseExchange(exchange, false);
-                }
-            });
+            // use default consumer callback
+            AsyncCallback cb = defaultConsumerCallback(exchange, false);
+            getAsyncProcessor().process(exchange, cb);
         } else {
             try {
                 getProcessor().process(exchange);
diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
index 25c24f6..b1a6ba6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -47,8 +47,23 @@ public interface Consumer extends Service, EndpointAware {
     /**
      * Releases the {@link Exchange} when its completed processing and no longer needed.
      *
+     * @param exchange    the exchange
      * @param autoRelease whether the exchange was created with auto release
      */
     void releaseExchange(Exchange exchange, boolean autoRelease);
 
+    /**
+     * The default callback to use with the consumer when calling the processor using asynchronous routing.
+     *
+     * This implementation will use {@link org.apache.camel.spi.ExceptionHandler} to handle any exception on the
+     * exchange and afterwards release the exchange.
+     *
+     * @param  exchange    the exchange
+     * @param  autoRelease whether the exchange was created with auto release
+     * @return             the default callback
+     */
+    default AsyncCallback defaultConsumerCallback(Exchange exchange, boolean autoRelease) {
+        return null;
+    }
+
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 67f838d..762a9c5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.support;
 
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
@@ -48,6 +51,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     private final Processor processor;
     private final AsyncProcessor asyncProcessor;
     private final ExchangeFactory exchangeFactory;
+    private final AtomicReference<AsyncCallback> pooledCallback = new AtomicReference<>();
     private ExceptionHandler exceptionHandler;
     private Route route;
     private String routeId;
@@ -145,6 +149,26 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     }
 
     @Override
+    public AsyncCallback defaultConsumerCallback(Exchange exchange, boolean autoRelease) {
+        boolean pooled = exchangeFactory.isPooled();
+        AsyncCallback answer = pooled ? pooledCallback.get() : null;
+        if (answer == null) {
+            answer = doneSync -> {
+                // handle any thrown exception
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing exchange", exchange,
+                            exchange.getException());
+                }
+                releaseExchange(exchange, autoRelease);
+            };
+            if (pooled) {
+                pooledCallback.set(answer);
+            }
+        }
+        return answer;
+    }
+
+    @Override
     public Endpoint getEndpoint() {
         return endpoint;
     }
@@ -199,6 +223,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     protected void doShutdown() throws Exception {
         LOG.debug("Shutting down consumer: {}", this);
         ServiceHelper.stopAndShutdownServices(exchangeFactory, processor);
+        pooledCallback.set(null);
     }
 
     /**