You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/11/25 18:29:10 UTC

svn commit: r1413377 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ test/java/org/apache/camel/component/seda/ test/java/org/apache/camel/component/vm/ test/java/org/apache/camel/processor/interceptor/

Author: davsclaus
Date: Sun Nov 25 17:29:08 2012
New Revision: 1413377

URL: http://svn.apache.org/viewvc?rev=1413377&view=rev
Log:
CAMEL-5793: Validate seda/vm endpoints when using size options that there is no miss-match. Otherwise users may use seda queues with a size they would not expect. Add INFO logging to show users what queue and sizes are in use.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sun Nov 25 17:29:08 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the <a href="http://camel.apache.org/seda.html">SEDA components</a>
@@ -32,6 +34,7 @@ import org.apache.camel.impl.DefaultComp
  * @version 
  */
 public class SedaComponent extends DefaultComponent {
+    protected final transient Logger log = LoggerFactory.getLogger(getClass());
     protected final int maxConcurrentConsumers = 500;
     protected int queueSize;
     protected int defaultConcurrentConsumers = 1;
@@ -53,14 +56,25 @@ public class SedaComponent extends Defau
         return defaultConcurrentConsumers;
     }
 
-    public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) {
+    public synchronized QueueReference getOrCreateQueue(String uri, Integer size) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
         if (ref != null) {
+
+            // if the given size is not provided, we just use the existing queue as is
+            if (size != null && ref.getSize() != size) {
+                // there is already a queue, so make sure the size matches
+                throw new IllegalArgumentException("Cannot use existing queue " + key + " as the existing queue size "
+                        + (ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE) + " does not match given queue size " + size);
+            }
             // add the reference before returning queue
             ref.addReference();
-            return ref.getQueue();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Reusing existing queue {} with size {} and reference count {}", new Object[]{key, size, ref.getCount()});
+            }
+            return ref;
         }
 
         // create queue
@@ -69,18 +83,20 @@ public class SedaComponent extends Defau
             queue = new LinkedBlockingQueue<Exchange>(size);
         } else {
             if (getQueueSize() > 0) {
+                size = getQueueSize();
                 queue = new LinkedBlockingQueue<Exchange>(getQueueSize());
             } else {
                 queue = new LinkedBlockingQueue<Exchange>();
             }
         }
+        log.debug("Created queue {} with size {}", key, size);
 
         // create and add a new reference queue
-        ref = new QueueReference(queue);
+        ref = new QueueReference(queue, size);
         ref.addReference();
         getQueues().put(key, ref);
 
-        return queue;
+        return ref;
     }
 
     public Map<String, QueueReference> getQueues() {
@@ -95,8 +111,8 @@ public class SedaComponent extends Defau
             throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
-        Integer size = getAndRemoveParameter(parameters, "size", Integer.class);
-        SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers);
+        // defer creating queue till endpoint is started, so we pass in null
+        SedaEndpoint answer = new SedaEndpoint(uri, this, null, consumers);
         answer.configureProperties(parameters);
         return answer;
     }
@@ -143,9 +159,11 @@ public class SedaComponent extends Defau
         
         private final BlockingQueue<Exchange> queue;
         private volatile int count;
+        private Integer size;
 
-        private QueueReference(BlockingQueue<Exchange> queue) {
+        private QueueReference(BlockingQueue<Exchange> queue, Integer size) {
             this.queue = queue;
+            this.size = size;
         }
         
         void addReference() {
@@ -164,6 +182,15 @@ public class SedaComponent extends Defau
         }
 
         /**
+         * Gets the queue size
+         *
+         * @return <tt>null</tt> if unbounded
+         */
+        public Integer getSize() {
+            return size;
+        }
+
+        /**
          * Gets the queue
          */
         public BlockingQueue<Exchange> getQueue() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sun Nov 25 17:29:08 2012
@@ -43,6 +43,8 @@ import org.apache.camel.util.EndpointHel
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the <a
@@ -51,8 +53,9 @@ import org.apache.camel.util.URISupport;
  */
 @ManagedResource(description = "Managed SedaEndpoint")
 public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, MultipleConsumersSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class);
     private volatile BlockingQueue<Exchange> queue;
-    private int size;
+    private int size = Integer.MAX_VALUE;
     private int concurrentConsumers = 1;
     private volatile ExecutorService multicastExecutor;
     private boolean multipleConsumers;
@@ -75,7 +78,9 @@ public class SedaEndpoint extends Defaul
     public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) {
         super(endpointUri, component);
         this.queue = queue;
-        this.size = queue.remainingCapacity();
+        if (queue != null) {
+            this.size = queue.remainingCapacity();
+        }
         this.concurrentConsumers = concurrentConsumers;
     }
 
@@ -98,10 +103,20 @@ public class SedaEndpoint extends Defaul
             // then the existing queue from the component can be used, so new producers and consumers
             // can use the already existing queue referenced from the component
             if (getComponent() != null) {
-                queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize());
+                // use null to indicate default size (= use what the existing queue has been configured with)
+                Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
+                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(), size);
+                queue = ref.getQueue();
+                String key = getComponent().getQueueKey(getEndpointUri());
+                LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this, key, ref.getSize() !=  null ? ref.getSize() : Integer.MAX_VALUE});
+                // and set the size we are using
+                if (ref.getSize() != null) {
+                    setSize(ref.getSize());
+                }
             } else {
                 // fallback and create queue (as this endpoint has no component)
                 queue = createQueue();
+                LOG.info("Endpoint {} is using queue: {} with size: {}", new Object[]{this, getEndpointUri(), getSize()});
             }
         }
         return queue;
@@ -358,6 +373,11 @@ public class SedaEndpoint extends Defaul
     protected void doStart() throws Exception {
         super.doStart();
 
+        // force creating queue when starting
+        if (queue == null) {
+            queue = getQueue();
+        }
+
         // special for unit testing where we can set a system property to make seda poll faster
         // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project
         String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout());

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java?rev=1413377&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java Sun Nov 25 17:29:08 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameSedaQueueSizeAndNoSizeTest extends ContextTestSupport {
+
+    public void testSameQueue() throws Exception {
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:foo", "" + i);
+        }
+
+        try {
+            template.sendBody("seda:foo", "Should be full now");
+            fail("Should fail");
+        } catch (CamelExecutionException e) {
+            IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause());
+            assertEquals("Queue full", ise.getMessage());
+        }
+    }
+
+    public void testSameQueueDifferentSize() throws Exception {
+        try {
+            template.sendBody("seda:foo?size=200", "Should fail");
+            fail("Should fail");
+        } catch (ResolveEndpointFailedException e) {
+            IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Cannot use existing queue seda://foo as the existing queue size 100 does not match given queue size 200", ise.getMessage());
+        }
+    }
+
+    public void testSameQueueDifferentSizeBar() throws Exception {
+        try {
+            template.sendBody("seda:bar?size=200", "Should fail");
+            fail("Should fail");
+        } catch (ResolveEndpointFailedException e) {
+            IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Cannot use existing queue seda://bar as the existing queue size " + Integer.MAX_VALUE + " does not match given queue size 200", ise.getMessage());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo?size=100").routeId("foo").noAutoStartup()
+                    .to("mock:foo");
+
+                from("seda:bar").routeId("bar").noAutoStartup()
+                    .to("mock:bar");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaQueueTest.java Sun Nov 25 17:29:08 2012
@@ -40,7 +40,7 @@ public class SedaQueueTest extends Conte
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("seda:foo?concurrentConsumers=2").to("mock:result");
+                from("seda:foo?size=20&concurrentConsumers=2").to("mock:result");
 
                 from("seda:bar").to("mock:result");
             }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java?rev=1413377&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java Sun Nov 25 17:29:08 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameVmQueueSizeAndNoSizeTest extends ContextTestSupport {
+
+    public void testSameQueue() throws Exception {
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("vm:foo", "" + i);
+        }
+
+        try {
+            template.sendBody("vm:foo", "Should be full now");
+            fail("Should fail");
+        } catch (CamelExecutionException e) {
+            IllegalStateException ise = assertIsInstanceOf(IllegalStateException.class, e.getCause());
+            assertEquals("Queue full", ise.getMessage());
+        }
+    }
+
+    public void testSameQueueDifferentSize() throws Exception {
+        try {
+            template.sendBody("vm:foo?size=200", "Should fail");
+            fail("Should fail");
+        } catch (ResolveEndpointFailedException e) {
+            IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Cannot use existing queue vm://foo as the existing queue size 100 does not match given queue size 200", ise.getMessage());
+        }
+    }
+
+    public void testSameQueueDifferentSizeBar() throws Exception {
+        try {
+            template.sendBody("vm:bar?size=200", "Should fail");
+            fail("Should fail");
+        } catch (ResolveEndpointFailedException e) {
+            IllegalArgumentException ise = assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Cannot use existing queue vm://bar as the existing queue size " + Integer.MAX_VALUE + " does not match given queue size 200", ise.getMessage());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("vm:foo?size=100").routeId("foo").noAutoStartup()
+                    .to("mock:foo");
+
+                from("vm:bar").routeId("bar").noAutoStartup()
+                    .to("mock:bar");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/SameVmQueueSizeAndNoSizeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmQueueTest.java Sun Nov 25 17:29:08 2012
@@ -49,7 +49,7 @@ public class VmQueueTest extends Abstrac
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("vm:foo?concurrentConsumers=2").to("mock:result");
+                from("vm:foo?size=20&concurrentConsumers=2").to("mock:result");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmUseSameQueueTest.java Sun Nov 25 17:29:08 2012
@@ -37,7 +37,7 @@ public class VmUseSameQueueTest extends 
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("vm:foo").to("mock:result");
+                from("vm:foo?size=500").to("mock:result");
             }
         };
     }
@@ -47,7 +47,7 @@ public class VmUseSameQueueTest extends 
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("vm:foo?size=500");
+                from("direct:start").to("vm:foo");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java?rev=1413377&r1=1413376&r2=1413377&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/AdviceWithMockEndpointsHavingParameterTest.java Sun Nov 25 17:29:08 2012
@@ -37,7 +37,7 @@ public class AdviceWithMockEndpointsHavi
     public void testAdvisedMockEndpoints() throws Exception {
         // advice the first route using the inlined AdviceWith route builder
         // which has extended capabilities than the regular route builder
-        context.getRouteDefinitions().get(0).adviceWith(context, new AdviceWithRouteBuilder() {
+        context.getRouteDefinitions().get(1).adviceWith(context, new AdviceWithRouteBuilder() {
             @Override
             public void configure() throws Exception {
                 // mock all endpoints (will mock in all routes)
@@ -74,16 +74,16 @@ public class AdviceWithMockEndpointsHavi
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                from("seda:foo?size=20")
+                        .transform(constant("Bye World"))
+                        .log("We transformed ${body}")
+                        .to("log:foo?showHeaders=false")
+                        .to("mock:foo");
+
                 from("direct:start")
                     .to("seda:foo")
                     .to("log:start?showAll=true")
                     .to("mock:result");
-
-                from("seda:foo?size=20")
-                    .transform(constant("Bye World"))
-                    .log("We transformed ${body}")
-                    .to("log:foo?showHeaders=false")
-                    .to("mock:foo");
             }
         };
     }