You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/13 14:08:33 UTC

git commit: Seda consumer should validate that thet are all have same multiple consumers option as they cannot have different values. This is per queue.

Updated Branches:
  refs/heads/camel-2.11.x 75a171143 -> 2d5929fd3


Seda consumer should validate that thet are all have same multiple consumers option as they cannot have different values. This is per queue.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d5929fd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d5929fd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d5929fd

Branch: refs/heads/camel-2.11.x
Commit: 2d5929fd3444c8a882ceedd72819915fc39ee199
Parents: 75a1711
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jun 12 16:00:29 2013 -0400
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jun 13 14:07:09 2013 +0200

----------------------------------------------------------------------
 .../camel/component/seda/SedaComponent.java     | 22 +++++-
 .../camel/component/seda/SedaEndpoint.java      | 13 +++-
 ...edaQueueMultipleConsumersDifferenceTest.java | 70 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index 8cb2b2b..7528fa8 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -56,7 +56,15 @@ public class SedaComponent extends DefaultComponent {
         return defaultConcurrentConsumers;
     }
 
+    /**
+     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)}
+     */
+    @Deprecated
     public synchronized QueueReference getOrCreateQueue(String uri, Integer size) {
+        return getOrCreateQueue(uri, size, null);
+    }
+
+    public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
@@ -92,7 +100,7 @@ public class SedaComponent extends DefaultComponent {
         log.debug("Created queue {} with size {}", key, size);
 
         // create and add a new reference queue
-        ref = new QueueReference(queue, size);
+        ref = new QueueReference(queue, size, multipleConsumers);
         ref.addReference();
         getQueues().put(key, ref);
 
@@ -103,6 +111,10 @@ public class SedaComponent extends DefaultComponent {
         return queues;
     }
 
+    public QueueReference getQueueReference(String key) {
+        return queues.get(key);
+    }
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers);
@@ -160,10 +172,12 @@ public class SedaComponent extends DefaultComponent {
         private final BlockingQueue<Exchange> queue;
         private volatile int count;
         private Integer size;
+        private Boolean multipleConsumers;
 
-        private QueueReference(BlockingQueue<Exchange> queue, Integer size) {
+        private QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) {
             this.queue = queue;
             this.size = size;
+            this.multipleConsumers = multipleConsumers;
         }
         
         void addReference() {
@@ -190,6 +204,10 @@ public class SedaComponent extends DefaultComponent {
             return size;
         }
 
+        public Boolean getMultipleConsumers() {
+            return multipleConsumers;
+        }
+
         /**
          * Gets the queue
          */

http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 3e1fa2b..7faa956 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -95,6 +95,17 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
+        if (getComponent() != null) {
+            // all consumers must match having the same multipleConsumers options
+            String key = getComponent().getQueueKey(getEndpointUri());
+            SedaComponent.QueueReference ref = getComponent().getQueueReference(key);
+            if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) {
+                // there is already a multiple consumers, so make sure they matches
+                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers "
+                        + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers);
+            }
+        }
+
         Consumer answer = new SedaConsumer(this, processor);
         configureConsumer(answer);
         return answer;
@@ -108,7 +119,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             if (getComponent() != null) {
                 // use null to indicate default size (= use what the existing queue has been configured with)
                 Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
-                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size);
+                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers());
                 queue = ref.getQueue();
                 String key = getComponent().getQueueKey(getEndpointUri());
                 LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : Integer.MAX_VALUE});

http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
new file mode 100644
index 0000000..98856df
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameSedaQueueMultipleConsumersDifferenceTest extends ContextTestSupport {
+
+    public void testSameOptions() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo?multipleConsumers=true", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSameOptionsProducerStillOkay() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAddConsumer() throws Exception {
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("seda:foo").routeId("fail").to("mock:fail");
+                }
+            });
+            fail("Should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Cannot use existing queue seda://foo as the existing queue multiple consumers true does not match given multiple consumers false", e.getMessage());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo?multipleConsumers=true").routeId("foo").to("mock:foo");
+                from("seda:foo?multipleConsumers=true").routeId("bar").to("mock:bar");
+            }
+        };
+    }
+}