You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/07 07:40:23 UTC

[camel] branch master updated (e8a7afd -> 0d9227f)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from e8a7afd  Update as2-component.adoc
     new 11fdf6d  Revert CAMEL-16462: camel-core - Optimize RecipientList EIP for default delimiter usage.
     new 0d9227f  CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/builder/BuilderSupport.java   |  7 +++
 .../apache/camel/processor/MulticastProcessor.java |  9 ++-
 .../org/apache/camel/processor/RecipientList.java  | 70 ++++++++--------------
 .../camel/processor/RecipientListProcessor.java    | 13 ++--
 .../java/org/apache/camel/processor/Splitter.java  |  5 +-
 5 files changed, 47 insertions(+), 57 deletions(-)

[camel] 01/02: Revert CAMEL-16462: camel-core - Optimize RecipientList EIP for default delimiter usage.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 11fdf6d4cbdfc3355bb3c5edbd1c05c03d4236fd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 09:06:09 2021 +0200

    Revert CAMEL-16462: camel-core - Optimize RecipientList EIP for default delimiter usage.
---
 .../org/apache/camel/builder/BuilderSupport.java   |  7 ++++++
 .../org/apache/camel/processor/RecipientList.java  | 26 +++++++---------------
 2 files changed, 15 insertions(+), 18 deletions(-)

diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/BuilderSupport.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/BuilderSupport.java
index 15cfde4..83504d1 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/builder/BuilderSupport.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/BuilderSupport.java
@@ -102,6 +102,13 @@ public abstract class BuilderSupport {
     }
 
     /**
+     * Returns a constant expression value builder
+     */
+    public ValueBuilder constant(Object... value) {
+        return Builder.constant(value);
+    }
+
+    /**
      * Returns a JOOR expression value builder
      */
     public ValueBuilder joor(String value) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index c45167a..d8b2321 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
@@ -191,25 +190,16 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
      * Sends the given exchange to the recipient list
      */
     public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
-        Iterator<?> iter = null;
-
-        if (recipientList instanceof String && delimiter != null && !delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
-            // optimize for fast iterator
-            String str = (String) recipientList;
-            if (delimiter.length() == 1) {
-                int count = StringHelper.countChar(str, delimiter.charAt(0)) + 1;
-                String[] parts = StringHelper.splitOnCharacter(str, delimiter, count);
-                iter = Arrays.asList((Object[]) parts).iterator();
-            }
-        }
-        if (iter == null) {
-            if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
-                iter = ObjectHelper.createIterator(recipientList, null);
-            } else {
-                iter = ObjectHelper.createIterator(recipientList, delimiter);
-            }
+        Iterator<?> iter;
+
+        if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
+            iter = ObjectHelper.createIterator(recipientList, null);
+        } else {
+            iter = ObjectHelper.createIterator(recipientList, delimiter);
         }
 
+        // TODO: Do not create a new processor per exchange
+        // TODO: Store iter on exchange property to be used when creating the pairs
         RecipientListProcessor rlp = new RecipientListProcessor(
                 exchange.getContext(), null, producerCache, iter, getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),

[camel] 02/02: CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0d9227ff16fb00e047fdd087740c87cce01bb545
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Apr 7 09:39:18 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations.
---
 .../apache/camel/processor/MulticastProcessor.java |  9 +++-
 .../org/apache/camel/processor/RecipientList.java  | 50 ++++++++--------------
 .../camel/processor/RecipientListProcessor.java    | 13 +++---
 .../java/org/apache/camel/processor/Splitter.java  |  5 ++-
 4 files changed, 35 insertions(+), 42 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6f35e5f..c010738 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,9 +320,13 @@ public class MulticastProcessor extends AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        return process(exchange, callback, null);
+    }
+
+    protected boolean process(Exchange exchange, AsyncCallback callback, Iterator iter) {
         Iterable<ProcessorExchangePair> pairs;
         try {
-            pairs = createProcessorExchangePairs(exchange);
+            pairs = createProcessorExchangePairs(exchange, iter);
         } catch (Throwable e) {
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
@@ -913,7 +917,8 @@ public class MulticastProcessor extends AsyncProcessorSupport
         return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class);
     }
 
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+            throws Exception {
         List<ProcessorExchangePair> result = new ArrayList<>(processors.size());
 
         StreamCache streamCache = null;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index d8b2321..78ef5bc 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -75,6 +75,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
     private boolean shutdownExecutorService;
     private volatile ExecutorService aggregateExecutorService;
     private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
+    private RecipientListProcessor recipientListProcessor;
 
     public RecipientList(CamelContext camelContext) {
         // use comma by default as delimiter
@@ -198,31 +199,8 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
             iter = ObjectHelper.createIterator(recipientList, delimiter);
         }
 
-        // TODO: Do not create a new processor per exchange
-        // TODO: Store iter on exchange property to be used when creating the pairs
-        RecipientListProcessor rlp = new RecipientListProcessor(
-                exchange.getContext(), null, producerCache, iter, getAggregationStrategy(),
-                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
-                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
-                isStopOnAggregateException());
-        rlp.setErrorHandler(errorHandler);
-        rlp.setAggregateExecutorService(aggregateExecutorService);
-        rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
-        rlp.setCacheSize(getCacheSize());
-        rlp.setId(getId());
-        rlp.setRouteId(getRouteId());
-
-        // start ourselves
-        try {
-            ServiceHelper.startService(rlp);
-        } catch (Exception e) {
-            exchange.setException(e);
-            callback.done(true);
-            return true;
-        }
-
         // now let the multicast process the exchange
-        return rlp.process(exchange, callback);
+        return recipientListProcessor.process(exchange, callback, iter);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
@@ -230,15 +208,12 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
     }
 
     @Override
-    protected void doInit() throws Exception {
+    protected void doStart() throws Exception {
         if (errorHandler == null) {
             // NoErrorHandler is the default base error handler if none has been configured
             errorHandler = new NoErrorHandler(null);
         }
-    }
 
-    @Override
-    protected void doStart() throws Exception {
         if (producerCache == null) {
             if (cacheSize < 0) {
                 producerCache = new EmptyProducerCache(this, camelContext);
@@ -253,17 +228,30 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou
             aggregateExecutorService
                     = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "RecipientList-AggregateTask", 0);
         }
-        ServiceHelper.startService(aggregationStrategy, producerCache);
+
+        recipientListProcessor = new RecipientListProcessor(
+                camelContext, null, producerCache, getAggregationStrategy(),
+                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
+                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
+                isStopOnAggregateException());
+        recipientListProcessor.setErrorHandler(errorHandler);
+        recipientListProcessor.setAggregateExecutorService(aggregateExecutorService);
+        recipientListProcessor.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+        recipientListProcessor.setCacheSize(getCacheSize());
+        recipientListProcessor.setId(getId());
+        recipientListProcessor.setRouteId(getRouteId());
+
+        ServiceHelper.startService(aggregationStrategy, producerCache, recipientListProcessor);
     }
 
     @Override
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(producerCache, aggregationStrategy);
+        ServiceHelper.stopService(producerCache, aggregationStrategy, recipientListProcessor);
     }
 
     @Override
     protected void doShutdown() throws Exception {
-        ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
+        ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy, recipientListProcessor);
 
         if (aggregateExecutorService != null) {
             camelContext.getExecutorServiceManager().shutdownNow(aggregateExecutorService);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index e1dacf3..f72eede 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 public class RecipientListProcessor extends MulticastProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
-    private final Iterator<?> iter;
     private boolean ignoreInvalidEndpoints;
     private final ProducerCache producerCache;
     private int cacheSize;
@@ -149,20 +148,18 @@ public class RecipientListProcessor extends MulticastProcessor {
 
     // TODO: camel-bean @RecipientList cacheSize
 
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter) {
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache) {
         super(camelContext, route, null);
         this.producerCache = producerCache;
-        this.iter = iter;
     }
 
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter,
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
                                   AggregationStrategy aggregationStrategy) {
         super(camelContext, route, null, aggregationStrategy);
         this.producerCache = producerCache;
-        this.iter = iter;
     }
 
-    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, Iterator<?> iter,
+    public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache,
                                   AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException,
@@ -172,7 +169,6 @@ public class RecipientListProcessor extends MulticastProcessor {
               streaming, stopOnException, timeout, onPrepare,
               shareUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.producerCache = producerCache;
-        this.iter = iter;
     }
 
     public int getCacheSize() {
@@ -192,7 +188,8 @@ public class RecipientListProcessor extends MulticastProcessor {
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+            throws Exception {
         // here we iterate the recipient lists and create the exchange pair for each of those
         List<ProcessorExchangePair> result = new ArrayList<>();
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 58851cc..217b5bf 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -159,7 +159,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter)
+            throws Exception {
+        // iter is only currently used by Recipient List EIP so its null
+
         Object value = expression.evaluate(exchange, Object.class);
         if (exchange.getException() != null) {
             // force any exceptions occurred during evaluation to be thrown