You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/04/24 18:25:59 UTC
[1/3] git commit: CAMEL-7393: Recipient list and routing slip eip
allows to set cache size for producer cache. So you can control this,
or turn it off.
Repository: camel
Updated Branches:
refs/heads/camel-2.12.x 8857bd0be -> 124835080
refs/heads/camel-2.13.x ad39395d5 -> 22ba8062a
refs/heads/master 9105dd7f2 -> 7f05a94e3
CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7f05a94e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7f05a94e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7f05a94e
Branch: refs/heads/master
Commit: 7f05a94e33253f7817f1619ed5dd48236ee7de06
Parents: 9105dd7
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 24 18:17:26 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 24 18:17:26 2014 +0200
----------------------------------------------------------------------
.../apache/camel/impl/EmptyProducerCache.java | 59 +++++++++++++++++++
.../camel/model/RecipientListDefinition.java | 25 +++++++-
.../camel/model/RoutingSlipDefinition.java | 28 ++++++++-
.../apache/camel/processor/RecipientList.java | 27 ++++++++-
.../org/apache/camel/processor/RoutingSlip.java | 21 ++++++-
.../camel/impl/EmptyProducerCacheTest.java | 61 ++++++++++++++++++++
.../processor/RecipientListNoCacheTest.java | 58 +++++++++++++++++++
.../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++
8 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
new file mode 100644
index 0000000..823bf35
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s.
+ */
+public class EmptyProducerCache extends ProducerCache {
+
+ public EmptyProducerCache(Object source, CamelContext camelContext) {
+ super(source, camelContext, 0);
+ }
+
+ @Override
+ public Producer acquireProducer(Endpoint endpoint) {
+ // always create a new producer
+ Producer answer = null;
+ try {
+ answer = endpoint.createProducer();
+ // must then start service so producer is ready to be used
+ ServiceHelper.startService(answer);
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(endpoint, e);
+ }
+ return answer;
+ }
+
+ @Override
+ public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
+ // stop and shutdown the producer as its not cache or reused
+ ServiceHelper.stopAndShutdownService(producer);
+ }
+
+ @Override
+ public String toString() {
+ return "EmptyProducerCache for source: " + getSource();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index fc12fb1..c5b5796 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
private Processor onPrepare;
@XmlAttribute
private Boolean shareUnitOfWork;
+ @XmlAttribute
+ private Integer cacheSize;
public RecipientListDefinition() {
}
@@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
answer.setParallelProcessing(isParallelProcessing());
answer.setStreaming(isStreaming());
answer.setShareUnitOfWork(isShareUnitOfWork());
+ if (getCacheSize() != null) {
+ answer.setCacheSize(getCacheSize());
+ }
if (onPrepareRef != null) {
onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
}
@@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
return this;
}
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers when using this recipient list, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public RecipientListDefinition<Type> cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
// Properties
//-------------------------------------------------------------------------
@@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
return shareUnitOfWork != null && shareUnitOfWork;
}
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index ff9d3c2..bfc3173 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
private String uriDelimiter;
@XmlAttribute
private Boolean ignoreInvalidEndpoints;
+ @XmlAttribute
+ private Integer cacheSize;
public RoutingSlipDefinition() {
this((String)null, DEFAULT_DELIMITER);
@@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
if (getIgnoreInvalidEndpoints() != null) {
routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints());
}
+ if (getCacheSize() != null) {
+ routingSlip.setCacheSize(getCacheSize());
+ }
return routingSlip;
}
@@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
public Boolean getIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
-
+
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
// Fluent API
// -------------------------------------------------------------------------
@@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
setUriDelimiter(uriDelimiter);
return this;
}
+
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers when using this recipient list, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public RoutingSlipDefinition<Type> cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index b1b85ed..abb635f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.processor;
-import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
@@ -27,14 +26,18 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.impl.EmptyProducerCache;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -47,6 +50,8 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* @version
*/
public class RecipientList extends ServiceSupport implements AsyncProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
private static final String IGNORE_DELIMITER_MARKER = "false";
private final CamelContext camelContext;
private ProducerCache producerCache;
@@ -57,6 +62,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
private boolean ignoreInvalidEndpoints;
private boolean streaming;
private long timeout;
+ private int cacheSize;
private Processor onPrepare;
private boolean shareUnitOfWork;
private ExecutorService executorService;
@@ -171,7 +177,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(this, camelContext);
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ LOG.debug("RecipientList {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startServices(aggregationStrategy, producerCache);
}
@@ -267,4 +282,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
public void setShareUnitOfWork(boolean shareUnitOfWork) {
this.shareUnitOfWork = shareUnitOfWork;
}
+
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 99b9069..88e57db 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.Producer;
import org.apache.camel.Traceable;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected ProducerCache producerCache;
+ protected int cacheSize;
protected boolean ignoreInvalidEndpoints;
protected String header;
protected Expression expression;
@@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
}
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
@Override
public String toString() {
return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(this, camelContext);
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ log.debug("RoutingSlip {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startService(producerCache);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
new file mode 100644
index 0000000..0f43d32
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+public class EmptyProducerCacheTest extends ContextTestSupport {
+
+ public void testEmptyCache() throws Exception {
+ ProducerCache cache = new EmptyProducerCache(this, context);
+ cache.start();
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ // we never cache any producers
+ Endpoint e = context.getEndpoint("direct:queue:1");
+ Producer p = cache.acquireProducer(e);
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ cache.releaseProducer(e, p);
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ cache.stop();
+ }
+
+ public void testCacheProducerAcquireAndRelease() throws Exception {
+ ProducerCache cache = new EmptyProducerCache(this, context);
+ cache.start();
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ // we never cache any producers
+ for (int i = 0; i < 1003; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ Producer p = cache.acquireProducer(e);
+ cache.releaseProducer(e, p);
+ }
+
+ assertEquals("Size should be 1000", 0, cache.size());
+ cache.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
new file mode 100644
index 0000000..122e72b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class RecipientListNoCacheTest extends ContextTestSupport {
+
+ public void testNoCache() throws Exception {
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBody(String body) {
+ template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+ "mock:x,mock:y,mock:z");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a").recipientList(
+ header("recipientListHeader").tokenize(",")).cacheSize(0);
+ }
+ };
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
new file mode 100644
index 0000000..e7bf94f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class RoutingSlipNoCacheTest extends ContextTestSupport {
+
+ public void testNoCache() throws Exception {
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBody(String body) {
+ template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+ "mock:x,mock:y,mock:z");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a").routingSlip(
+ header("recipientListHeader").tokenize(",")).cacheSize(0);
+ }
+ };
+
+ }
+
+}
[2/3] git commit: CAMEL-7393: Recipient list and routing slip eip
allows to set cache size for producer cache. So you can control this,
or turn it off.
Posted by da...@apache.org.
CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.
Conflicts:
camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/12483508
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/12483508
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/12483508
Branch: refs/heads/camel-2.12.x
Commit: 124835080e8e0e80341a7d7673608a366ba3e26e
Parents: 8857bd0
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 24 18:17:26 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 24 18:19:56 2014 +0200
----------------------------------------------------------------------
.../apache/camel/impl/EmptyProducerCache.java | 59 +++++++++++++++++++
.../camel/model/RecipientListDefinition.java | 25 +++++++-
.../camel/model/RoutingSlipDefinition.java | 28 ++++++++-
.../apache/camel/processor/RecipientList.java | 24 +++++++-
.../org/apache/camel/processor/RoutingSlip.java | 21 ++++++-
.../camel/impl/EmptyProducerCacheTest.java | 61 ++++++++++++++++++++
.../processor/RecipientListNoCacheTest.java | 58 +++++++++++++++++++
.../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++
8 files changed, 330 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
new file mode 100644
index 0000000..823bf35
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s.
+ */
+public class EmptyProducerCache extends ProducerCache {
+
+ public EmptyProducerCache(Object source, CamelContext camelContext) {
+ super(source, camelContext, 0);
+ }
+
+ @Override
+ public Producer acquireProducer(Endpoint endpoint) {
+ // always create a new producer
+ Producer answer = null;
+ try {
+ answer = endpoint.createProducer();
+ // must then start service so producer is ready to be used
+ ServiceHelper.startService(answer);
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(endpoint, e);
+ }
+ return answer;
+ }
+
+ @Override
+ public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
+ // stop and shutdown the producer as its not cache or reused
+ ServiceHelper.stopAndShutdownService(producer);
+ }
+
+ @Override
+ public String toString() {
+ return "EmptyProducerCache for source: " + getSource();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index fc12fb1..c5b5796 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
private Processor onPrepare;
@XmlAttribute
private Boolean shareUnitOfWork;
+ @XmlAttribute
+ private Integer cacheSize;
public RecipientListDefinition() {
}
@@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
answer.setParallelProcessing(isParallelProcessing());
answer.setStreaming(isStreaming());
answer.setShareUnitOfWork(isShareUnitOfWork());
+ if (getCacheSize() != null) {
+ answer.setCacheSize(getCacheSize());
+ }
if (onPrepareRef != null) {
onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
}
@@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
return this;
}
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers when using this recipient list, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public RecipientListDefinition<Type> cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
// Properties
//-------------------------------------------------------------------------
@@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
return shareUnitOfWork != null && shareUnitOfWork;
}
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index ff9d3c2..bfc3173 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
private String uriDelimiter;
@XmlAttribute
private Boolean ignoreInvalidEndpoints;
+ @XmlAttribute
+ private Integer cacheSize;
public RoutingSlipDefinition() {
this((String)null, DEFAULT_DELIMITER);
@@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
if (getIgnoreInvalidEndpoints() != null) {
routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints());
}
+ if (getCacheSize() != null) {
+ routingSlip.setCacheSize(getCacheSize());
+ }
return routingSlip;
}
@@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
public Boolean getIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
-
+
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
// Fluent API
// -------------------------------------------------------------------------
@@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
setUriDelimiter(uriDelimiter);
return this;
}
+
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers when using this recipient list, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public RoutingSlipDefinition<Type> cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 4b700a9..94652f3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -26,6 +26,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.impl.EmptyProducerCache;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
@@ -34,6 +35,8 @@ import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -46,6 +49,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* @version
*/
public class RecipientList extends ServiceSupport implements AsyncProcessor {
+ private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
private final CamelContext camelContext;
private ProducerCache producerCache;
private Expression expression;
@@ -55,6 +59,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
private boolean ignoreInvalidEndpoints;
private boolean streaming;
private long timeout;
+ private int cacheSize;
private Processor onPrepare;
private boolean shareUnitOfWork;
private ExecutorService executorService;
@@ -163,7 +168,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(this, camelContext);
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ LOG.debug("RecipientList {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startServices(aggregationStrategy, producerCache);
}
@@ -259,4 +273,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
public void setShareUnitOfWork(boolean shareUnitOfWork) {
this.shareUnitOfWork = shareUnitOfWork;
}
+
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 99b9069..88e57db 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.Producer;
import org.apache.camel.Traceable;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected ProducerCache producerCache;
+ protected int cacheSize;
protected boolean ignoreInvalidEndpoints;
protected String header;
protected Expression expression;
@@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
}
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
@Override
public String toString() {
return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(this, camelContext);
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ log.debug("RoutingSlip {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startService(producerCache);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
new file mode 100644
index 0000000..0f43d32
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+public class EmptyProducerCacheTest extends ContextTestSupport {
+
+ public void testEmptyCache() throws Exception {
+ ProducerCache cache = new EmptyProducerCache(this, context);
+ cache.start();
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ // we never cache any producers
+ Endpoint e = context.getEndpoint("direct:queue:1");
+ Producer p = cache.acquireProducer(e);
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ cache.releaseProducer(e, p);
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ cache.stop();
+ }
+
+ public void testCacheProducerAcquireAndRelease() throws Exception {
+ ProducerCache cache = new EmptyProducerCache(this, context);
+ cache.start();
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ // we never cache any producers
+ for (int i = 0; i < 1003; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ Producer p = cache.acquireProducer(e);
+ cache.releaseProducer(e, p);
+ }
+
+ assertEquals("Size should be 1000", 0, cache.size());
+ cache.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
new file mode 100644
index 0000000..122e72b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class RecipientListNoCacheTest extends ContextTestSupport {
+
+ public void testNoCache() throws Exception {
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBody(String body) {
+ template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+ "mock:x,mock:y,mock:z");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a").recipientList(
+ header("recipientListHeader").tokenize(",")).cacheSize(0);
+ }
+ };
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
new file mode 100644
index 0000000..e7bf94f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class RoutingSlipNoCacheTest extends ContextTestSupport {
+
+ public void testNoCache() throws Exception {
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBody(String body) {
+ template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+ "mock:x,mock:y,mock:z");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a").routingSlip(
+ header("recipientListHeader").tokenize(",")).cacheSize(0);
+ }
+ };
+
+ }
+
+}
[3/3] git commit: CAMEL-7393: Recipient list and routing slip eip
allows to set cache size for producer cache. So you can control this,
or turn it off.
Posted by da...@apache.org.
CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/22ba8062
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/22ba8062
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/22ba8062
Branch: refs/heads/camel-2.13.x
Commit: 22ba8062a0f56abd49f16ce6e1af8af276161bba
Parents: ad39395
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 24 18:17:26 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 24 18:25:41 2014 +0200
----------------------------------------------------------------------
.../apache/camel/impl/EmptyProducerCache.java | 59 +++++++++++++++++++
.../camel/model/RecipientListDefinition.java | 25 +++++++-
.../camel/model/RoutingSlipDefinition.java | 28 ++++++++-
.../apache/camel/processor/RecipientList.java | 27 ++++++++-
.../org/apache/camel/processor/RoutingSlip.java | 21 ++++++-
.../camel/impl/EmptyProducerCacheTest.java | 61 ++++++++++++++++++++
.../processor/RecipientListNoCacheTest.java | 58 +++++++++++++++++++
.../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++
8 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
new file mode 100644
index 0000000..823bf35
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s.
+ */
+public class EmptyProducerCache extends ProducerCache {
+
+ public EmptyProducerCache(Object source, CamelContext camelContext) {
+ super(source, camelContext, 0);
+ }
+
+ @Override
+ public Producer acquireProducer(Endpoint endpoint) {
+ // always create a new producer
+ Producer answer = null;
+ try {
+ answer = endpoint.createProducer();
+ // must then start service so producer is ready to be used
+ ServiceHelper.startService(answer);
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(endpoint, e);
+ }
+ return answer;
+ }
+
+ @Override
+ public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
+ // stop and shutdown the producer as its not cache or reused
+ ServiceHelper.stopAndShutdownService(producer);
+ }
+
+ @Override
+ public String toString() {
+ return "EmptyProducerCache for source: " + getSource();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index fc12fb1..c5b5796 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
@@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
private Processor onPrepare;
@XmlAttribute
private Boolean shareUnitOfWork;
+ @XmlAttribute
+ private Integer cacheSize;
public RecipientListDefinition() {
}
@@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
answer.setParallelProcessing(isParallelProcessing());
answer.setStreaming(isStreaming());
answer.setShareUnitOfWork(isShareUnitOfWork());
+ if (getCacheSize() != null) {
+ answer.setCacheSize(getCacheSize());
+ }
if (onPrepareRef != null) {
onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
}
@@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
return this;
}
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers when using this recipient list, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public RecipientListDefinition<Type> cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
// Properties
//-------------------------------------------------------------------------
@@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
return shareUnitOfWork != null && shareUnitOfWork;
}
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index ff9d3c2..bfc3173 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
private String uriDelimiter;
@XmlAttribute
private Boolean ignoreInvalidEndpoints;
+ @XmlAttribute
+ private Integer cacheSize;
public RoutingSlipDefinition() {
this((String)null, DEFAULT_DELIMITER);
@@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
if (getIgnoreInvalidEndpoints() != null) {
routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints());
}
+ if (getCacheSize() != null) {
+ routingSlip.setCacheSize(getCacheSize());
+ }
return routingSlip;
}
@@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
public Boolean getIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
-
+
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
// Fluent API
// -------------------------------------------------------------------------
@@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
setUriDelimiter(uriDelimiter);
return this;
}
+
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers when using this recipient list, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public RoutingSlipDefinition<Type> cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index b1b85ed..abb635f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.processor;
-import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
@@ -27,14 +26,18 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.impl.EmptyProducerCache;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -47,6 +50,8 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* @version
*/
public class RecipientList extends ServiceSupport implements AsyncProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
private static final String IGNORE_DELIMITER_MARKER = "false";
private final CamelContext camelContext;
private ProducerCache producerCache;
@@ -57,6 +62,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
private boolean ignoreInvalidEndpoints;
private boolean streaming;
private long timeout;
+ private int cacheSize;
private Processor onPrepare;
private boolean shareUnitOfWork;
private ExecutorService executorService;
@@ -171,7 +177,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(this, camelContext);
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ LOG.debug("RecipientList {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startServices(aggregationStrategy, producerCache);
}
@@ -267,4 +282,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
public void setShareUnitOfWork(boolean shareUnitOfWork) {
this.shareUnitOfWork = shareUnitOfWork;
}
+
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 99b9069..88e57db 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.Producer;
import org.apache.camel.Traceable;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
protected final Logger log = LoggerFactory.getLogger(getClass());
protected ProducerCache producerCache;
+ protected int cacheSize;
protected boolean ignoreInvalidEndpoints;
protected String header;
protected Expression expression;
@@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
}
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
@Override
public String toString() {
return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
protected void doStart() throws Exception {
if (producerCache == null) {
- producerCache = new ProducerCache(this, camelContext);
+ if (cacheSize < 0) {
+ producerCache = new EmptyProducerCache(this, camelContext);
+ log.debug("RoutingSlip {} is not using ProducerCache", this);
+ } else if (cacheSize == 0) {
+ producerCache = new ProducerCache(this, camelContext);
+ log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
+ } else {
+ producerCache = new ProducerCache(this, camelContext, cacheSize);
+ log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
+ }
}
ServiceHelper.startService(producerCache);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
new file mode 100644
index 0000000..0f43d32
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+public class EmptyProducerCacheTest extends ContextTestSupport {
+
+ public void testEmptyCache() throws Exception {
+ ProducerCache cache = new EmptyProducerCache(this, context);
+ cache.start();
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ // we never cache any producers
+ Endpoint e = context.getEndpoint("direct:queue:1");
+ Producer p = cache.acquireProducer(e);
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ cache.releaseProducer(e, p);
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ cache.stop();
+ }
+
+ public void testCacheProducerAcquireAndRelease() throws Exception {
+ ProducerCache cache = new EmptyProducerCache(this, context);
+ cache.start();
+
+ assertEquals("Size should be 0", 0, cache.size());
+
+ // we never cache any producers
+ for (int i = 0; i < 1003; i++) {
+ Endpoint e = context.getEndpoint("direct:queue:" + i);
+ Producer p = cache.acquireProducer(e);
+ cache.releaseProducer(e, p);
+ }
+
+ assertEquals("Size should be 1000", 0, cache.size());
+ cache.stop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
new file mode 100644
index 0000000..122e72b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class RecipientListNoCacheTest extends ContextTestSupport {
+
+ public void testNoCache() throws Exception {
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBody(String body) {
+ template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+ "mock:x,mock:y,mock:z");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a").recipientList(
+ header("recipientListHeader").tokenize(",")).cacheSize(0);
+ }
+ };
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
new file mode 100644
index 0000000..e7bf94f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class RoutingSlipNoCacheTest extends ContextTestSupport {
+
+ public void testNoCache() throws Exception {
+ MockEndpoint x = getMockEndpoint("mock:x");
+ MockEndpoint y = getMockEndpoint("mock:y");
+ MockEndpoint z = getMockEndpoint("mock:z");
+
+ x.expectedBodiesReceived("foo", "bar");
+ y.expectedBodiesReceived("foo", "bar");
+ z.expectedBodiesReceived("foo", "bar");
+
+ sendBody("foo");
+ sendBody("bar");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected void sendBody(String body) {
+ template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+ "mock:x,mock:y,mock:z");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:a").routingSlip(
+ header("recipientListHeader").tokenize(",")).cacheSize(0);
+ }
+ };
+
+ }
+
+}