You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/09 11:42:26 UTC

[camel] 01/02: CAMEL-16462: camel-core - Optimize Splitter EIP to reduce object allocations.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6011aa0bb482efa4a4a12db94b05f6e4da881275
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Apr 9 13:14:31 2021 +0200

    CAMEL-16462: camel-core - Optimize Splitter EIP to reduce object allocations.
---
 .../apache/camel/spi/ProcessorExchangeFactory.java |  9 +++-
 .../engine/PooledProcessorExchangeFactory.java     | 50 ++++++++++++++++------
 .../engine/PrototypeProcessorExchangeFactory.java  |  5 +++
 .../apache/camel/processor/MulticastProcessor.java |  9 +---
 .../java/org/apache/camel/processor/Splitter.java  | 19 +++++---
 5 files changed, 65 insertions(+), 27 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
index c48ad15..214635f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ProcessorExchangeFactory.java
@@ -57,7 +57,14 @@ public interface ProcessorExchangeFactory extends PooledObjectFactory<Exchange>,
     /**
      * Gets a copy of the given {@link Exchange}
      *
-     * @param exchange original copy of the exchange
+     * @param exchange original exchange
+     */
+    Exchange createCopy(Exchange exchange);
+
+    /**
+     * Gets a copy of the given {@link Exchange} the the copy is correlated to the source
+     *
+     * @param exchange original exchange
      * @param handover whether the on completion callbacks should be handed over to the new copy.
      */
     Exchange createCorrelatedCopy(Exchange exchange, boolean handover);
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
index 1826659..fe3c1d2 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledProcessorExchangeFactory.java
@@ -58,26 +58,36 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
     }
 
     @Override
-    public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
-        Exchange answer = pool.poll();
+    public Exchange createCopy(Exchange exchange) {
+        ExtendedExchange answer = (ExtendedExchange) pool.poll();
         if (answer == null) {
+            if (statisticsEnabled) {
+                statistics.created.increment();
+            }
             // create a new exchange as there was no free from the pool
-            PooledExchange pe = new DefaultPooledExchange(exchange);
-            ExchangeHelper.copyResults(pe, exchange);
-            // do not reuse message id on copy
-            pe.getIn().setMessageId(null);
-            // do not share the unit of work
-            pe.setUnitOfWork(null);
-            if (handover) {
-                // Need to hand over the completion for async invocation
-                pe.adapt(ExtendedExchange.class).handoverCompletions(exchange);
+            answer = new DefaultPooledExchange(exchange);
+        } else {
+            if (statisticsEnabled) {
+                statistics.acquired.increment();
             }
-            // set a correlation id so we can track back the original exchange
-            pe.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId());
+            // reset exchange for reuse
+            PooledExchange ee = (PooledExchange) answer;
+            ee.reset(System.currentTimeMillis());
+        }
+
+        ExchangeHelper.copyResults(answer, exchange);
+        return answer;
+    }
+
+    @Override
+    public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+        ExtendedExchange answer = (ExtendedExchange) pool.poll();
+        if (answer == null) {
             if (statisticsEnabled) {
                 statistics.created.increment();
             }
-            answer = pe;
+            // create a new exchange as there was no free from the pool
+            answer = new DefaultPooledExchange(exchange);
         } else {
             if (statisticsEnabled) {
                 statistics.acquired.increment();
@@ -86,6 +96,18 @@ public class PooledProcessorExchangeFactory extends PrototypeProcessorExchangeFa
             PooledExchange ee = (PooledExchange) answer;
             ee.reset(System.currentTimeMillis());
         }
+
+        ExchangeHelper.copyResults(answer, exchange);
+        // do not reuse message id on copy
+        answer.getIn().setMessageId(null);
+        // do not share the unit of work
+        answer.setUnitOfWork(null);
+        if (handover) {
+            // Need to hand over the completion for async invocation
+            answer.handoverCompletions(exchange);
+        }
+        // set a correlation id so we can track back the original exchange
+        answer.setProperty(ExchangePropertyKey.CORRELATION_ID, exchange.getExchangeId());
         return answer;
     }
 
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
index 3a4a1f1..65de8f4 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeProcessorExchangeFactory.java
@@ -81,6 +81,11 @@ public class PrototypeProcessorExchangeFactory extends PooledObjectFactorySuppor
     }
 
     @Override
+    public Exchange createCopy(Exchange exchange) {
+        return exchange.copy();
+    }
+
+    @Override
     public Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
         return ExchangeHelper.createCorrelatedCopy(exchange, handover);
     }
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 39e59a7..199a392 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -215,13 +215,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
         this.shareUnitOfWork = shareUnitOfWork;
         this.parallelAggregate = parallelAggregate;
         this.stopOnAggregateException = stopOnAggregateException;
-        if (this instanceof Splitter) {
-            // not supported for splitter
-            this.processorExchangeFactory = null;
-        } else {
-            this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class)
-                    .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
-        }
+        this.processorExchangeFactory = camelContext.adapt(ExtendedCamelContext.class)
+                .getProcessorExchangeFactory().newProcessorExchangeFactory(this);
     }
 
     @Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 64fd105..a90091d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -190,7 +190,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
         // this avoids any side effect reflected upon the incoming exchange
         final Object value;
         final Iterator<?> iterator;
-        private final Exchange copy;
+        private Exchange copy;
         private final Route route;
         private final Exchange original;
 
@@ -251,7 +251,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
                     if (part != null) {
                         // create a correlated copy as the new exchange to be routed in the splitter from the copy
                         // and do not share the unit of work
-                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+                        Exchange newExchange = processorExchangeFactory.createCorrelatedCopy(copy, false);
                         // If the splitter has an aggregation strategy
                         // then the StreamCache created by the child routes must not be
                         // closed by the unit of work of the child route, but by the unit of
@@ -284,7 +284,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
 
         @Override
         public void close() throws IOException {
-            IOHelper.closeIterator(value);
+            if (copy != null) {
+                processorExchangeFactory.release(copy);
+                // null copy to avoid releasing it back again as close may be called multiple times
+                copy = null;
+                IOHelper.closeIterator(value);
+            }
         }
 
     }
@@ -337,8 +342,12 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
         return expression;
     }
 
-    private static Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) {
-        Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
+    private Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) {
+        Exchange answer = processorExchangeFactory.createCopy(exchange);
+        if (preserveExchangeId) {
+            // must preserve exchange id
+            answer.setExchangeId(exchange.getExchangeId());
+        }
         if (exchange.getContext().isMessageHistory()) {
             // we do not want to copy the message history for splitted sub-messages
             answer.removeProperty(ExchangePropertyKey.MESSAGE_HISTORY);