You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/04/24 18:25:59 UTC

[1/3] git commit: CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.

Repository: camel
Updated Branches:
  refs/heads/camel-2.12.x 8857bd0be -> 124835080
  refs/heads/camel-2.13.x ad39395d5 -> 22ba8062a
  refs/heads/master 9105dd7f2 -> 7f05a94e3


CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7f05a94e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7f05a94e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7f05a94e

Branch: refs/heads/master
Commit: 7f05a94e33253f7817f1619ed5dd48236ee7de06
Parents: 9105dd7
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 24 18:17:26 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 24 18:17:26 2014 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/EmptyProducerCache.java   | 59 +++++++++++++++++++
 .../camel/model/RecipientListDefinition.java    | 25 +++++++-
 .../camel/model/RoutingSlipDefinition.java      | 28 ++++++++-
 .../apache/camel/processor/RecipientList.java   | 27 ++++++++-
 .../org/apache/camel/processor/RoutingSlip.java | 21 ++++++-
 .../camel/impl/EmptyProducerCacheTest.java      | 61 ++++++++++++++++++++
 .../processor/RecipientListNoCacheTest.java     | 58 +++++++++++++++++++
 .../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++
 8 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
new file mode 100644
index 0000000..823bf35
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s.
+ */
+public class EmptyProducerCache extends ProducerCache {
+
+    public EmptyProducerCache(Object source, CamelContext camelContext) {
+        super(source, camelContext, 0);
+    }
+
+    @Override
+    public Producer acquireProducer(Endpoint endpoint) {
+        // always create a new producer
+        Producer answer = null;
+        try {
+            answer = endpoint.createProducer();
+            // must then start service so producer is ready to be used
+            ServiceHelper.startService(answer);
+        } catch (Exception e) {
+            throw new FailedToCreateProducerException(endpoint, e);
+        }
+        return answer;
+    }
+
+    @Override
+    public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
+        // stop and shutdown the producer as its not cache or reused
+        ServiceHelper.stopAndShutdownService(producer);
+    }
+
+    @Override
+    public String toString() {
+        return "EmptyProducerCache for source: " + getSource();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index fc12fb1..c5b5796 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     private Processor onPrepare;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public RecipientListDefinition() {
     }
@@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         answer.setParallelProcessing(isParallelProcessing());
         answer.setStreaming(isStreaming());   
         answer.setShareUnitOfWork(isShareUnitOfWork());
+        if (getCacheSize() != null) {
+            answer.setCacheSize(getCacheSize());
+        }
         if (onPrepareRef != null) {
             onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
         }
@@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers when using this recipient list, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
     // Properties
     //-------------------------------------------------------------------------
 
@@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return shareUnitOfWork != null && shareUnitOfWork;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index ff9d3c2..bfc3173 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     private String uriDelimiter;
     @XmlAttribute
     private Boolean ignoreInvalidEndpoints;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public RoutingSlipDefinition() {
         this((String)null, DEFAULT_DELIMITER);
@@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         if (getIgnoreInvalidEndpoints() != null) {
             routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints());
         }
+        if (getCacheSize() != null) {
+            routingSlip.setCacheSize(getCacheSize());
+        }
         return routingSlip;
     }
 
@@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     public Boolean getIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
-    
+
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
@@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         setUriDelimiter(uriDelimiter);
         return this;
     }
+
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers when using this recipient list, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RoutingSlipDefinition<Type> cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index b1b85ed..abb635f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 
@@ -27,14 +26,18 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -47,6 +50,8 @@ import static org.apache.camel.util.ObjectHelper.notNull;
  * @version 
  */
 public class RecipientList extends ServiceSupport implements AsyncProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
     private static final String IGNORE_DELIMITER_MARKER = "false";
     private final CamelContext camelContext;
     private ProducerCache producerCache;
@@ -57,6 +62,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
     private boolean ignoreInvalidEndpoints;
     private boolean streaming;
     private long timeout;
+    private int cacheSize;
     private Processor onPrepare;
     private boolean shareUnitOfWork;
     private ExecutorService executorService;
@@ -171,7 +177,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext);
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                LOG.debug("RecipientList {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startServices(aggregationStrategy, producerCache);
     }
@@ -267,4 +282,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
     public void setShareUnitOfWork(boolean shareUnitOfWork) {
         this.shareUnitOfWork = shareUnitOfWork;
     }
+
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 99b9069..88e57db 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
 public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected ProducerCache producerCache;
+    protected int cacheSize;
     protected boolean ignoreInvalidEndpoints;
     protected String header;
     protected Expression expression;
@@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     @Override
     public String toString() {
         return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext);
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                log.debug("RoutingSlip {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startService(producerCache);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
new file mode 100644
index 0000000..0f43d32
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+public class EmptyProducerCacheTest extends ContextTestSupport {
+
+    public void testEmptyCache() throws Exception {
+        ProducerCache cache = new EmptyProducerCache(this, context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // we never cache any producers
+        Endpoint e = context.getEndpoint("direct:queue:1");
+        Producer p = cache.acquireProducer(e);
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        cache.releaseProducer(e, p);
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        cache.stop();
+    }
+
+    public void testCacheProducerAcquireAndRelease() throws Exception {
+        ProducerCache cache = new EmptyProducerCache(this, context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // we never cache any producers
+        for (int i = 0; i < 1003; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            Producer p = cache.acquireProducer(e);
+            cache.releaseProducer(e, p);
+        }
+
+        assertEquals("Size should be 1000", 0, cache.size());
+        cache.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
new file mode 100644
index 0000000..122e72b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class RecipientListNoCacheTest extends ContextTestSupport {
+
+    public void testNoCache() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+                "mock:x,mock:y,mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").recipientList(
+                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+            }
+        };
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7f05a94e/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
new file mode 100644
index 0000000..e7bf94f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class RoutingSlipNoCacheTest extends ContextTestSupport {
+
+    public void testNoCache() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+                "mock:x,mock:y,mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").routingSlip(
+                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+            }
+        };
+
+    }
+
+}


[2/3] git commit: CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.

Posted by da...@apache.org.
CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.

Conflicts:
	camel-core/src/main/java/org/apache/camel/processor/RecipientList.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/12483508
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/12483508
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/12483508

Branch: refs/heads/camel-2.12.x
Commit: 124835080e8e0e80341a7d7673608a366ba3e26e
Parents: 8857bd0
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 24 18:17:26 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 24 18:19:56 2014 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/EmptyProducerCache.java   | 59 +++++++++++++++++++
 .../camel/model/RecipientListDefinition.java    | 25 +++++++-
 .../camel/model/RoutingSlipDefinition.java      | 28 ++++++++-
 .../apache/camel/processor/RecipientList.java   | 24 +++++++-
 .../org/apache/camel/processor/RoutingSlip.java | 21 ++++++-
 .../camel/impl/EmptyProducerCacheTest.java      | 61 ++++++++++++++++++++
 .../processor/RecipientListNoCacheTest.java     | 58 +++++++++++++++++++
 .../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++
 8 files changed, 330 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
new file mode 100644
index 0000000..823bf35
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s.
+ */
+public class EmptyProducerCache extends ProducerCache {
+
+    public EmptyProducerCache(Object source, CamelContext camelContext) {
+        super(source, camelContext, 0);
+    }
+
+    @Override
+    public Producer acquireProducer(Endpoint endpoint) {
+        // always create a new producer
+        Producer answer = null;
+        try {
+            answer = endpoint.createProducer();
+            // must then start service so producer is ready to be used
+            ServiceHelper.startService(answer);
+        } catch (Exception e) {
+            throw new FailedToCreateProducerException(endpoint, e);
+        }
+        return answer;
+    }
+
+    @Override
+    public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
+        // stop and shutdown the producer as its not cache or reused
+        ServiceHelper.stopAndShutdownService(producer);
+    }
+
+    @Override
+    public String toString() {
+        return "EmptyProducerCache for source: " + getSource();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index fc12fb1..c5b5796 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     private Processor onPrepare;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public RecipientListDefinition() {
     }
@@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         answer.setParallelProcessing(isParallelProcessing());
         answer.setStreaming(isStreaming());   
         answer.setShareUnitOfWork(isShareUnitOfWork());
+        if (getCacheSize() != null) {
+            answer.setCacheSize(getCacheSize());
+        }
         if (onPrepareRef != null) {
             onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
         }
@@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers when using this recipient list, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
     // Properties
     //-------------------------------------------------------------------------
 
@@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return shareUnitOfWork != null && shareUnitOfWork;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index ff9d3c2..bfc3173 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     private String uriDelimiter;
     @XmlAttribute
     private Boolean ignoreInvalidEndpoints;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public RoutingSlipDefinition() {
         this((String)null, DEFAULT_DELIMITER);
@@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         if (getIgnoreInvalidEndpoints() != null) {
             routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints());
         }
+        if (getCacheSize() != null) {
+            routingSlip.setCacheSize(getCacheSize());
+        }
         return routingSlip;
     }
 
@@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     public Boolean getIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
-    
+
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
@@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         setUriDelimiter(uriDelimiter);
         return this;
     }
+
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers when using this recipient list, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RoutingSlipDefinition<Type> cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 4b700a9..94652f3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -26,6 +26,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
@@ -34,6 +35,8 @@ import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -46,6 +49,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
  * @version 
  */
 public class RecipientList extends ServiceSupport implements AsyncProcessor {
+    private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
     private final CamelContext camelContext;
     private ProducerCache producerCache;
     private Expression expression;
@@ -55,6 +59,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
     private boolean ignoreInvalidEndpoints;
     private boolean streaming;
     private long timeout;
+    private int cacheSize;
     private Processor onPrepare;
     private boolean shareUnitOfWork;
     private ExecutorService executorService;
@@ -163,7 +168,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext);
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                LOG.debug("RecipientList {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startServices(aggregationStrategy, producerCache);
     }
@@ -259,4 +273,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
     public void setShareUnitOfWork(boolean shareUnitOfWork) {
         this.shareUnitOfWork = shareUnitOfWork;
     }
+
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 99b9069..88e57db 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
 public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected ProducerCache producerCache;
+    protected int cacheSize;
     protected boolean ignoreInvalidEndpoints;
     protected String header;
     protected Expression expression;
@@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     @Override
     public String toString() {
         return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext);
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                log.debug("RoutingSlip {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startService(producerCache);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
new file mode 100644
index 0000000..0f43d32
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+public class EmptyProducerCacheTest extends ContextTestSupport {
+
+    public void testEmptyCache() throws Exception {
+        ProducerCache cache = new EmptyProducerCache(this, context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // we never cache any producers
+        Endpoint e = context.getEndpoint("direct:queue:1");
+        Producer p = cache.acquireProducer(e);
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        cache.releaseProducer(e, p);
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        cache.stop();
+    }
+
+    public void testCacheProducerAcquireAndRelease() throws Exception {
+        ProducerCache cache = new EmptyProducerCache(this, context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // we never cache any producers
+        for (int i = 0; i < 1003; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            Producer p = cache.acquireProducer(e);
+            cache.releaseProducer(e, p);
+        }
+
+        assertEquals("Size should be 1000", 0, cache.size());
+        cache.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
new file mode 100644
index 0000000..122e72b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class RecipientListNoCacheTest extends ContextTestSupport {
+
+    public void testNoCache() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+                "mock:x,mock:y,mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").recipientList(
+                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+            }
+        };
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
new file mode 100644
index 0000000..e7bf94f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class RoutingSlipNoCacheTest extends ContextTestSupport {
+
+    public void testNoCache() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+                "mock:x,mock:y,mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").routingSlip(
+                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+            }
+        };
+
+    }
+
+}


[3/3] git commit: CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.

Posted by da...@apache.org.
CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/22ba8062
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/22ba8062
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/22ba8062

Branch: refs/heads/camel-2.13.x
Commit: 22ba8062a0f56abd49f16ce6e1af8af276161bba
Parents: ad39395
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Apr 24 18:17:26 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 24 18:25:41 2014 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/EmptyProducerCache.java   | 59 +++++++++++++++++++
 .../camel/model/RecipientListDefinition.java    | 25 +++++++-
 .../camel/model/RoutingSlipDefinition.java      | 28 ++++++++-
 .../apache/camel/processor/RecipientList.java   | 27 ++++++++-
 .../org/apache/camel/processor/RoutingSlip.java | 21 ++++++-
 .../camel/impl/EmptyProducerCacheTest.java      | 61 ++++++++++++++++++++
 .../processor/RecipientListNoCacheTest.java     | 58 +++++++++++++++++++
 .../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++
 8 files changed, 332 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
new file mode 100644
index 0000000..823bf35
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.FailedToCreateProducerException;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s.
+ */
+public class EmptyProducerCache extends ProducerCache {
+
+    public EmptyProducerCache(Object source, CamelContext camelContext) {
+        super(source, camelContext, 0);
+    }
+
+    @Override
+    public Producer acquireProducer(Endpoint endpoint) {
+        // always create a new producer
+        Producer answer = null;
+        try {
+            answer = endpoint.createProducer();
+            // must then start service so producer is ready to be used
+            ServiceHelper.startService(answer);
+        } catch (Exception e) {
+            throw new FailedToCreateProducerException(endpoint, e);
+        }
+        return answer;
+    }
+
+    @Override
+    public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception {
+        // stop and shutdown the producer as its not cache or reused
+        ServiceHelper.stopAndShutdownService(producer);
+    }
+
+    @Override
+    public String toString() {
+        return "EmptyProducerCache for source: " + getSource();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index fc12fb1..c5b5796 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
     private Processor onPrepare;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public RecipientListDefinition() {
     }
@@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         answer.setParallelProcessing(isParallelProcessing());
         answer.setStreaming(isStreaming());   
         answer.setShareUnitOfWork(isShareUnitOfWork());
+        if (getCacheSize() != null) {
+            answer.setCacheSize(getCacheSize());
+        }
         if (onPrepareRef != null) {
             onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
         }
@@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers when using this recipient list, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
     // Properties
     //-------------------------------------------------------------------------
 
@@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext
         return shareUnitOfWork != null && shareUnitOfWork;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index ff9d3c2..bfc3173 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     private String uriDelimiter;
     @XmlAttribute
     private Boolean ignoreInvalidEndpoints;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public RoutingSlipDefinition() {
         this((String)null, DEFAULT_DELIMITER);
@@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         if (getIgnoreInvalidEndpoints() != null) {
             routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints());
         }
+        if (getCacheSize() != null) {
+            routingSlip.setCacheSize(getCacheSize());
+        }
         return routingSlip;
     }
 
@@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
     public Boolean getIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }
-    
+
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
@@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten
         setUriDelimiter(uriDelimiter);
         return this;
     }
+
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers when using this recipient list, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public RoutingSlipDefinition<Type> cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index b1b85ed..abb635f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 
@@ -27,14 +26,18 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -47,6 +50,8 @@ import static org.apache.camel.util.ObjectHelper.notNull;
  * @version 
  */
 public class RecipientList extends ServiceSupport implements AsyncProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
     private static final String IGNORE_DELIMITER_MARKER = "false";
     private final CamelContext camelContext;
     private ProducerCache producerCache;
@@ -57,6 +62,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
     private boolean ignoreInvalidEndpoints;
     private boolean streaming;
     private long timeout;
+    private int cacheSize;
     private Processor onPrepare;
     private boolean shareUnitOfWork;
     private ExecutorService executorService;
@@ -171,7 +177,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext);
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                LOG.debug("RecipientList {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startServices(aggregationStrategy, producerCache);
     }
@@ -267,4 +282,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor {
     public void setShareUnitOfWork(boolean shareUnitOfWork) {
         this.shareUnitOfWork = shareUnitOfWork;
     }
+
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index 99b9069..88e57db 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
 public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
     protected final Logger log = LoggerFactory.getLogger(getClass());
     protected ProducerCache producerCache;
+    protected int cacheSize;
     protected boolean ignoreInvalidEndpoints;
     protected String header;
     protected Expression expression;
@@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
     @Override
     public String toString() {
         return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
@@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext);
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                log.debug("RoutingSlip {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, cacheSize);
+                log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
+            }
         }
         ServiceHelper.startService(producerCache);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
new file mode 100644
index 0000000..0f43d32
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+public class EmptyProducerCacheTest extends ContextTestSupport {
+
+    public void testEmptyCache() throws Exception {
+        ProducerCache cache = new EmptyProducerCache(this, context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // we never cache any producers
+        Endpoint e = context.getEndpoint("direct:queue:1");
+        Producer p = cache.acquireProducer(e);
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        cache.releaseProducer(e, p);
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        cache.stop();
+    }
+
+    public void testCacheProducerAcquireAndRelease() throws Exception {
+        ProducerCache cache = new EmptyProducerCache(this, context);
+        cache.start();
+
+        assertEquals("Size should be 0", 0, cache.size());
+
+        // we never cache any producers
+        for (int i = 0; i < 1003; i++) {
+            Endpoint e = context.getEndpoint("direct:queue:" + i);
+            Producer p = cache.acquireProducer(e);
+            cache.releaseProducer(e, p);
+        }
+
+        assertEquals("Size should be 1000", 0, cache.size());
+        cache.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
new file mode 100644
index 0000000..122e72b
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class RecipientListNoCacheTest extends ContextTestSupport {
+
+    public void testNoCache() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+                "mock:x,mock:y,mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").recipientList(
+                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+            }
+        };
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/22ba8062/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
new file mode 100644
index 0000000..e7bf94f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class RoutingSlipNoCacheTest extends ContextTestSupport {
+
+    public void testNoCache() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        sendBody("foo");
+        sendBody("bar");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendBody(String body) {
+        template.sendBodyAndHeader("direct:a", body, "recipientListHeader",
+                "mock:x,mock:y,mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").routingSlip(
+                        header("recipientListHeader").tokenize(",")).cacheSize(0);
+            }
+        };
+
+    }
+
+}