You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/05/20 21:55:56 UTC

svn commit: r1340818 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/seda/ test/java/org/apache/camel/issues/

Author: davsclaus
Date: Sun May 20 19:55:55 2012
New Revision: 1340818

URL: http://svn.apache.org/viewvc?rev=1340818&view=rev
Log:
CAMEL-5261: Fixed seda endpoints shutdown and restart due advice with not picking up refreshed seda queue to use between producers and consumers.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=1340818&r1=1340817&r2=1340818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Sun May 20 19:55:55 2012
@@ -53,7 +53,7 @@ public class SedaComponent extends Defau
         return defaultConcurrentConsumers;
     }
 
-    public synchronized BlockingQueue<Exchange> createQueue(String uri, Map<String, Object> parameters) {
+    public synchronized BlockingQueue<Exchange> getOrCreateQueue(String uri, Integer size) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
@@ -65,7 +65,6 @@ public class SedaComponent extends Defau
 
         // create queue
         BlockingQueue<Exchange> queue;
-        Integer size = getAndRemoveParameter(parameters, "size", Integer.class);
         if (size != null && size > 0) {
             queue = new LinkedBlockingQueue<Exchange>(size);
         } else {
@@ -96,7 +95,8 @@ public class SedaComponent extends Defau
             throw new IllegalArgumentException("The limitConcurrentConsumers flag in set to true. ConcurrentConsumers cannot be set at a value greater than "
                     + maxConcurrentConsumers + " was " + consumers);
         }
-        SedaEndpoint answer = new SedaEndpoint(uri, this, createQueue(uri, parameters), consumers);
+        Integer size = getAndRemoveParameter(parameters, "size", Integer.class);
+        SedaEndpoint answer = new SedaEndpoint(uri, this, getOrCreateQueue(uri, size), consumers);
         answer.configureProperties(parameters);
         return answer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1340818&r1=1340817&r2=1340818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Sun May 20 19:55:55 2012
@@ -94,15 +94,27 @@ public class SedaEndpoint extends Defaul
 
     public synchronized BlockingQueue<Exchange> getQueue() {
         if (queue == null) {
-            if (size > 0) {
-                queue = new LinkedBlockingQueue<Exchange>(size);
+            // prefer to lookup queue from component, so if this endpoint is re-created or re-started
+            // then the existing queue from the component can be used, so new producers and consumers
+            // can use the already existing queue referenced from the component
+            if (getComponent() != null) {
+                queue = getComponent().getOrCreateQueue(getEndpointUri(), getSize());
             } else {
-                queue = new LinkedBlockingQueue<Exchange>();
+                // fallback and create queue (as this endpoint has no component)
+                queue = createQueue();
             }
         }
         return queue;
     }
 
+    protected BlockingQueue<Exchange> createQueue() {
+        if (size > 0) {
+            return new LinkedBlockingQueue<Exchange>(size);
+        } else {
+            return new LinkedBlockingQueue<Exchange>();
+        }
+    }
+
     protected synchronized MulticastProcessor getConsumerMulticastProcessor() throws Exception {
         if (!multicastStarted && consumerMulticastProcessor != null) {
             // only start it on-demand to avoid starting it during stopping
@@ -363,6 +375,10 @@ public class SedaEndpoint extends Defaul
             getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
             multicastExecutor = null;
         }
+
+        // clear queue, as we are shutdown, so if re-created then the queue must be updated
+        queue = null;
+
         super.doShutdown();
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java?rev=1340818&r1=1340817&r2=1340818&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java Sun May 20 19:55:55 2012
@@ -27,7 +27,6 @@ import org.apache.camel.WaitForTaskToCom
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.URISupport;
 
 /**
  * @version 

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java?rev=1340818&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/AdviceWithUrlIssueTest.java Sun May 20 19:55:55 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.AdviceWithRouteBuilder;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class AdviceWithUrlIssueTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:test?concurrentConsumers=1").routeId("sedaroute")
+                    .to("log:before")
+                    .to("mock:target");
+            }
+        };
+    }
+
+    public void testProducerWithDifferentUri() throws Exception {
+        context.getRouteDefinition("sedaroute").adviceWith(context, new Advice());
+
+        getMockEndpoint("mock:target").expectedMessageCount(0);
+        getMockEndpoint("mock:target2").expectedMessageCount(1);
+
+        template.requestBody("seda:test", "TESTING");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testProducerWithSameUri() throws Exception {
+        context.getRouteDefinition("sedaroute").adviceWith(context, new Advice());
+
+        getMockEndpoint("mock:target").expectedMessageCount(0);
+        getMockEndpoint("mock:target2").expectedMessageCount(1);
+
+        template.requestBody("seda:test?concurrentConsumers=1", "TESTING");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private class Advice extends AdviceWithRouteBuilder {
+        @Override
+        public void configure() throws Exception {
+            interceptSendToEndpoint("mock:target").skipSendToOriginalEndpoint().to("mock:target2");
+        }
+    }
+
+}