You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/13 14:08:33 UTC
git commit: Seda consumer should validate that thet are all have same
multiple consumers option as they cannot have different values. This is per
queue.
Updated Branches:
refs/heads/camel-2.11.x 75a171143 -> 2d5929fd3
Seda consumer should validate that thet are all have same multiple consumers option as they cannot have different values. This is per queue.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d5929fd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d5929fd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d5929fd
Branch: refs/heads/camel-2.11.x
Commit: 2d5929fd3444c8a882ceedd72819915fc39ee199
Parents: 75a1711
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jun 12 16:00:29 2013 -0400
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jun 13 14:07:09 2013 +0200
----------------------------------------------------------------------
.../camel/component/seda/SedaComponent.java | 22 +++++-
.../camel/component/seda/SedaEndpoint.java | 13 +++-
...edaQueueMultipleConsumersDifferenceTest.java | 70 ++++++++++++++++++++
3 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index 8cb2b2b..7528fa8 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -56,7 +56,15 @@ public class SedaComponent extends DefaultComponent {
return defaultConcurrentConsumers;
}
+ /**
+ * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)}
+ */
+ @Deprecated
public synchronized QueueReference getOrCreateQueue(String uri, Integer size) {
+ return getOrCreateQueue(uri, size, null);
+ }
+
+ public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean multipleConsumers) {
String key = getQueueKey(uri);
QueueReference ref = getQueues().get(key);
@@ -92,7 +100,7 @@ public class SedaComponent extends DefaultComponent {
log.debug("Created queue {} with size {}", key, size);
// create and add a new reference queue
- ref = new QueueReference(queue, size);
+ ref = new QueueReference(queue, size, multipleConsumers);
ref.addReference();
getQueues().put(key, ref);
@@ -103,6 +111,10 @@ public class SedaComponent extends DefaultComponent {
return queues;
}
+ public QueueReference getQueueReference(String key) {
+ return queues.get(key);
+ }
+
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class, defaultConcurrentConsumers);
@@ -160,10 +172,12 @@ public class SedaComponent extends DefaultComponent {
private final BlockingQueue<Exchange> queue;
private volatile int count;
private Integer size;
+ private Boolean multipleConsumers;
- private QueueReference(BlockingQueue<Exchange> queue, Integer size) {
+ private QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean multipleConsumers) {
this.queue = queue;
this.size = size;
+ this.multipleConsumers = multipleConsumers;
}
void addReference() {
@@ -190,6 +204,10 @@ public class SedaComponent extends DefaultComponent {
return size;
}
+ public Boolean getMultipleConsumers() {
+ return multipleConsumers;
+ }
+
/**
* Gets the queue
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 3e1fa2b..7faa956 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -95,6 +95,17 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
}
public Consumer createConsumer(Processor processor) throws Exception {
+ if (getComponent() != null) {
+ // all consumers must match having the same multipleConsumers options
+ String key = getComponent().getQueueKey(getEndpointUri());
+ SedaComponent.QueueReference ref = getComponent().getQueueReference(key);
+ if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) {
+ // there is already a multiple consumers, so make sure they matches
+ throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue multiple consumers "
+ + ref.getMultipleConsumers() + " does not match given multiple consumers " + multipleConsumers);
+ }
+ }
+
Consumer answer = new SedaConsumer(this, processor);
configureConsumer(answer);
return answer;
@@ -108,7 +119,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
if (getComponent() != null) {
// use null to indicate default size (= use what the existing queue has been configured with)
Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
- SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size);
+ SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size, isMultipleConsumers());
queue = ref.getQueue();
String key = getComponent().getQueueKey(getEndpointUri());
LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE});
http://git-wip-us.apache.org/repos/asf/camel/blob/2d5929fd/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
new file mode 100644
index 0000000..98856df
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameSedaQueueMultipleConsumersDifferenceTest extends ContextTestSupport {
+
+ public void testSameOptions() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+ template.sendBody("seda:foo?multipleConsumers=true", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSameOptionsProducerStillOkay() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+ template.sendBody("seda:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testAddConsumer() throws Exception {
+ try {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo").routeId("fail").to("mock:fail");
+ }
+ });
+ fail("Should have thrown exception");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Cannot use existing queue seda://foo as the existing queue multiple consumers true does not match given multiple consumers false", e.getMessage());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo?multipleConsumers=true").routeId("foo").to("mock:foo");
+ from("seda:foo?multipleConsumers=true").routeId("bar").to("mock:bar");
+ }
+ };
+ }
+}