You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/02 08:45:17 UTC
[camel] branch master updated: CAMEL-14342: Add cacheSize option to
@RecipientList EIP annotation
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 9bcb94a CAMEL-14342: Add cacheSize option to @RecipientList EIP annotation
9bcb94a is described below
commit 9bcb94a4903d6c9235d6bc85733b4d990a5da6a3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jan 2 09:44:12 2020 +0100
CAMEL-14342: Add cacheSize option to @RecipientList EIP annotation
---
.../src/main/java/org/apache/camel/RecipientList.java | 6 ++++++
.../impl/engine/DefaultAnnotationBasedProcessorFactory.java | 1 +
.../org/apache/camel/processor/RecipientListProcessor.java | 13 ++-----------
3 files changed, 9 insertions(+), 11 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/RecipientList.java b/core/camel-api/src/main/java/org/apache/camel/RecipientList.java
index ae8157b..3e77b51 100644
--- a/core/camel-api/src/main/java/org/apache/camel/RecipientList.java
+++ b/core/camel-api/src/main/java/org/apache/camel/RecipientList.java
@@ -120,6 +120,12 @@ public @interface RecipientList {
long timeout() default 0;
/**
+ * Sets the maximum size used by the producer cache which is used to cache and
+ * reuse producers when using this recipient list, when uris are reused.
+ */
+ int cacheSize() default 0;
+
+ /**
* Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
* This can be used to deep-clone messages that should be send, or any custom logic needed before
* the exchange is send.
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
index ed2201f..6aeb5e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAnnotationBasedProcessorFactory.java
@@ -50,6 +50,7 @@ public final class DefaultAnnotationBasedProcessorFactory implements AnnotationB
recipientList.setParallelAggregate(annotation.parallelAggregate());
recipientList.setStreaming(annotation.streaming());
recipientList.setTimeout(annotation.timeout());
+ recipientList.setCacheSize(annotation.cacheSize());
recipientList.setShareUnitOfWork(annotation.shareUnitOfWork());
if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 0d0b87c..8e1d65b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -32,7 +32,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -40,6 +39,7 @@ import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,13 +158,6 @@ public class RecipientListProcessor extends MulticastProcessor {
}
public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy,
- boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
- boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) {
- this(camelContext, producerCache, iter, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
- shareUnitOfWork, parallelAggregate, false);
- }
-
- public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<?> iter, AggregationStrategy aggregationStrategy,
boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException,
long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
@@ -272,9 +265,7 @@ public class RecipientListProcessor extends MulticastProcessor {
@Override
protected void doStart() throws Exception {
super.doStart();
- if (producerCache == null) {
- producerCache = new DefaultProducerCache(this, getCamelContext(), 0);
- }
+ ObjectHelper.notNull(producerCache, "producerCache", this);
ServiceHelper.startService(producerCache);
}