You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/25 18:29:10 UTC
svn commit: r1413377 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/component/seda/
test/java/org/apache/camel/component/seda/
test/java/org/apache/camel/component/vm/
test/java/org/apache/camel/processor/interceptor/
Author: davsclaus
Date: Sun Nov 25 17:29:08 2012
New Revision: 1413377
URL: http://svn.apache.org/viewvc?rev=1413377&view=rev
Log:
CAMEL-5793: Validate seda/vm endpoints when using size options that there is no miss-match. Otherwise users may use seda queues with a size they would not expect. Add INFO logging to show users what queue and sizes are in use.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sun Nov 25 17:29:08 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlocki
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a>
@@ -32,6 +34,7 @@ import org.apache.camel.impl.DefaultComp
* @version
*/
public class SedaComponent extends DefaultComponent {
+ protected final transient Logger log = LoggerFactory.getLogger(getClass());
protected final int maxConcurrentConsumers = 500;
protected int queueSize;
protected int defaultConcurrentConsumers = 1;
@@ -53,14 +56,25 @@ public class SedaComponent extends Defau
return defaultConcurrentConsumers;
}
- public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) {
+ public synchronized QueueReference getOrCreateQueue(String uri, Integer size) {
String key = getQueueKey(uri);
QueueReference ref = getQueues().get(key);
if (ref != null) {
+
+ // if the given size is not provided, we just use the existing queue as is
+ if (size != null && ref.getSize() != size) {
+ // there is already a queue, so make sure the size matches
+ throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size "
+ + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size);
+ }
// add the reference before returning queue
ref.addReference();
- return ref.getQueue();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()});
+ }
+ return ref;
}
// create queue
@@ -69,18 +83,20 @@ public class SedaComponent extends Defau
queue = new LinkedBlockingQueue<Exchange>(size);
} else {
if (getQueueSize() > 0) {
+ size = getQueueSize();
queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
} else {
queue = new LinkedBlockingQueue<Exchange>();
}
}
+ log.debug("Created queue {} with size {}", key, size);
// create and add a new reference queue
- ref = new QueueReference(queue);
+ ref = new QueueReference(queue, size);
ref.addReference();
getQueues().put(key, ref);
- return queue;
+ return ref;
}
public Map<String, QueueReference> getQueues() {
@@ -95,8 +111,8 @@ public class SedaComponent extends Defau
throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
+ maxConcurrentConsumers + " was " + consumers);
}
- Integer size = getAndRemoveParameter(parameters, "size", Integer.class);
- SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers);
+ // defer creating queue till endpoint is started, so we pass in null
+ SedaEndpoint answer = new SedaEndpoint(uri, this, null, consumers);
answer.configureProperties(parameters);
return answer;
}
@@ -143,9 +159,11 @@ public class SedaComponent extends Defau
private final BlockingQueue<Exchange> queue;
private volatile int count;
+ private Integer size;
- private QueueReference(BlockingQueue<Exchange> queue) {
+ private QueueReference(BlockingQueue<Exchange> queue, Integer size) {
this.queue = queue;
+ this.size = size;
}
void addReference() {
@@ -164,6 +182,15 @@ public class SedaComponent extends Defau
}
/**
+ * Gets the queue size
+ *
+ * @return <tt>null</tt> if unbounded
+ */
+ public Integer getSize() {
+ return size;
+ }
+
+ /**
* Gets the queue
*/
public BlockingQueue<Exchange> getQueue() {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sun Nov 25 17:29:08 2012
@@ -43,6 +43,8 @@ import org.apache.camel.util.EndpointHel
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of the <a
@@ -51,8 +53,9 @@ import org.apache.camel.util.URISupport;
*/
@ManagedResource(description = "Managed SedaEndpoint")
public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
+ private static final transient Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class);
private volatile BlockingQueue<Exchange> queue;
- private int size;
+ private int size = Integer.MAX_VALUE;
private int concurrentConsumers = 1;
private volatile ExecutorService multicastExecutor;
private boolean multipleConsumers;
@@ -75,7 +78,9 @@ public class SedaEndpoint extends Defaul
public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
super(endpointUri, component);
this.queue = queue;
- this.size = queue.remainingCapacity();
+ if (queue != null) {
+ this.size = queue.remainingCapacity();
+ }
this.concurrentConsumers = concurrentConsumers;
}
@@ -98,10 +103,20 @@ public class SedaEndpoint extends Defaul
// then the existing queue from the component can be used, so new producers and consumers
// can use the already existing queue referenced from the component
if (getComponent() != null) {
- queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize());
+ // use null to indicate default size (= use what the existing queue has been configured with)
+ Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
+ SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size);
+ queue = ref.getQueue();
+ String key = getComponent().getQueueKey(getEndpointUri());
+ LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE});
+ // and set the size we are using
+ if (ref.getSize() != null) {
+ setSize(ref.getSize());
+ }
} else {
// fallback and create queue (as this endpoint has no component)
queue = createQueue();
+ LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()});
}
}
return queue;
@@ -358,6 +373,11 @@ public class SedaEndpoint extends Defaul
protected void doStart() throws Exception {
super.doStart();
+ // force creating queue when starting
+ if (queue == null) {
+ queue = getQueue();
+ }
+
// special for unit testing where we can set a system property to make seda poll faster
// and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project
String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout());
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java?rev=1413377&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java Sun Nov 25 17:29:08 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameSedaQueueSizeAndNoSizeTest extends ContextTestSupport {
+
+ public void testSameQueue() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("seda:foo", "" + i);
+ }
+
+ try {
+ template.sendBody("seda:foo", "Should be full now");
+ fail("Should fail");
+ } catch (CamelExecutionException e) {
+ IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause());
+ assertEquals("Queue full", ise.getMessage());
+ }
+ }
+
+ public void testSameQueueDifferentSize() throws Exception {
+ try {
+ template.sendBody("seda:foo?size=200", "Should fail");
+ fail("Should fail");
+ } catch (ResolveEndpointFailedException e) {
+ IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Cannot use existing queue seda://foo as the existing queue size 100 does not match given queue size 200", ise.getMessage());
+ }
+ }
+
+ public void testSameQueueDifferentSizeBar() throws Exception {
+ try {
+ template.sendBody("seda:bar?size=200", "Should fail");
+ fail("Should fail");
+ } catch (ResolveEndpointFailedException e) {
+ IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Cannot use existing queue seda://bar as the existing queue size " + Integer.MAX_VALUE + " does not match given queue size 200", ise.getMessage());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo?size=100").routeId("foo").noAutoStartup()
+ .to("mock:foo");
+
+ from("seda:bar").routeId("bar").noAutoStartup()
+ .to("mock:bar");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java Sun Nov 25 17:29:08 2012
@@ -40,7 +40,7 @@ public class SedaQueueTest extends Conte
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("seda:foo?concurrentConsumers=2").to("mock:result");
+ from("seda:foo?size=20&concurrentConsumers=2").to("mock:result");
from("seda:bar").to("mock:result");
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java?rev=1413377&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java Sun Nov 25 17:29:08 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameVmQueueSizeAndNoSizeTest extends ContextTestSupport {
+
+ public void testSameQueue() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ template.sendBody("vm:foo", "" + i);
+ }
+
+ try {
+ template.sendBody("vm:foo", "Should be full now");
+ fail("Should fail");
+ } catch (CamelExecutionException e) {
+ IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause());
+ assertEquals("Queue full", ise.getMessage());
+ }
+ }
+
+ public void testSameQueueDifferentSize() throws Exception {
+ try {
+ template.sendBody("vm:foo?size=200", "Should fail");
+ fail("Should fail");
+ } catch (ResolveEndpointFailedException e) {
+ IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Cannot use existing queue vm://foo as the existing queue size 100 does not match given queue size 200", ise.getMessage());
+ }
+ }
+
+ public void testSameQueueDifferentSizeBar() throws Exception {
+ try {
+ template.sendBody("vm:bar?size=200", "Should fail");
+ fail("Should fail");
+ } catch (ResolveEndpointFailedException e) {
+ IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Cannot use existing queue vm://bar as the existing queue size " + Integer.MAX_VALUE + " does not match given queue size 200", ise.getMessage());
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("vm:foo?size=100").routeId("foo").noAutoStartup()
+ .to("mock:foo");
+
+ from("vm:bar").routeId("bar").noAutoStartup()
+ .to("mock:bar");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java Sun Nov 25 17:29:08 2012
@@ -49,7 +49,7 @@ public class VmQueueTest extends Abstrac
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("vm:foo?concurrentConsumers=2").to("mock:result");
+ from("vm:foo?size=20&concurrentConsumers=2").to("mock:result");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java Sun Nov 25 17:29:08 2012
@@ -37,7 +37,7 @@ public class VmUseSameQueueTest extends
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("vm:foo").to("mock:result");
+ from("vm:foo?size=500").to("mock:result");
}
};
}
@@ -47,7 +47,7 @@ public class VmUseSameQueueTest extends
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("vm:foo?size=500");
+ from("direct:start").to("vm:foo");
}
};
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java Sun Nov 25 17:29:08 2012
@@ -37,7 +37,7 @@ public class AdviceWithMockEndpointsHavi
public void testAdvisedMockEndpoints() throws Exception {
// advice the first route using the inlined AdviceWith route builder
// which has extended capabilities than the regular route builder
- context.getRouteDefinitions().get(0).adviceWith(context, new AdviceWithRouteBuilder() {
+ context.getRouteDefinitions().get(1).adviceWith(context, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
// mock all endpoints (will mock in all routes)
@@ -74,16 +74,16 @@ public class AdviceWithMockEndpointsHavi
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ from("seda:foo?size=20")
+ .transform(constant("Bye World"))
+ .log("We transformed ${body}")
+ .to("log:foo?showHeaders=false")
+ .to("mock:foo");
+
from("direct:start")
.to("seda:foo")
.to("log:start?showAll=true")
.to("mock:result");
-
- from("seda:foo?size=20")
- .transform(constant("Bye World"))
- .log("We transformed ${body}")
- .to("log:foo?showHeaders=false")
- .to("mock:foo");
}
};
}