You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/02 08:45:17 UTC

[camel] branch master updated: CAMEL-14342: Add cacheSize option to @RecipientList EIP annotation

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bcb94a  CAMEL-14342: Add cacheSize option to @RecipientList EIP annotation
9bcb94a is described below

commit 9bcb94a4903d6c9235d6bc85733b4d990a5da6a3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 2 09:44:12 2020 +0100

    CAMEL-14342: Add cacheSize option to @RecipientList EIP annotation
---
 .../src/main/java/org/apache/camel/RecipientList.java       |  6 ++++++
 .../impl/engine/DefaultAnnotationBasedProcessorFactory.java |  1 +
 .../org/apache/camel/processor/RecipientListProcessor.java  | 13 ++-----------
 3 files changed, 9 insertions(+), 11 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/RecipientList.java b/core/camel-api/src/main/java/org/apache/camel/RecipientList.java
index ae8157b..3e77b51 100644
--- a/core/camel-api/src/main/java/org/apache/camel/RecipientList.java
+++ b/core/camel-api/src/main/java/org/apache/camel/RecipientList.java
@@ -120,6 +120,12 @@ public @interface RecipientList {
     long timeout() default 0;
 
     /**
+     * Sets the maximum size used by the producer cache which is used to cache and
+     * reuse producers when using this recipient list, when uris are reused.
+     */
+    int cacheSize() default 0;
+
+    /**
      * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
      * This can be used to deep-clone messages that should be send, or any custom logic needed before
      * the exchange is send.
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
index ed2201f..6aeb5e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
@@ -50,6 +50,7 @@ public final class DefaultAnnotationBasedProcessorFactory implements AnnotationB
         recipientList.setParallelAggregate(annotation.parallelAggregate());
         recipientList.setStreaming(annotation.streaming());
         recipientList.setTimeout(annotation.timeout());
+        recipientList.setCacheSize(annotation.cacheSize());
         recipientList.setShareUnitOfWork(annotation.shareUnitOfWork());
 
         if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 0d0b87c..8e1d65b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,7 +32,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.engine.DefaultProducerCache;
 import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -40,6 +39,7 @@ import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -158,13 +158,6 @@ public class RecipientListProcessor extends MulticastProcessor {
     }
 
     public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy,
-                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
-                                  boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) {
-        this(camelContext, producerCache, iter, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
-             shareUnitOfWork, parallelAggregate, false);
-    }
-
-    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException,
                                   long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
         super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
@@ -272,9 +265,7 @@ public class RecipientListProcessor extends MulticastProcessor {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        if (producerCache == null) {
-            producerCache = new DefaultProducerCache(this, getCamelContext(), 0);
-        }
+        ObjectHelper.notNull(producerCache, "producerCache", this);
         ServiceHelper.startService(producerCache);
     }