You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/28 17:00:56 UTC

svn commit: r928423 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/component/direct/ test/java/org/apache/camel/processor/

Author: davsclaus
Date: Sun Mar 28 15:00:56 2010
New Revision: 928423

URL: http://svn.apache.org/viewvc?rev=928423&view=rev
Log:
CAMEL-2590: routes from direct endpoint is automatic deferred on shutdown to offer better graceful shutdown when using direct endpoints.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java   (contents, props changed)
      - copied, changed from r928220, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java?rev=928423&r1=928422&r2=928423&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java Sun Mar 28 15:00:56 2010
@@ -18,14 +18,16 @@ package org.apache.camel.component.direc
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.ShutdownAware;
 
 /**
  * The direct consumer.
  *
  * @version $Revision$
  */
-public class DirectConsumer extends DefaultConsumer {
+public class DirectConsumer extends DefaultConsumer implements ShutdownAware {
 
     private DirectEndpoint endpoint;
 
@@ -52,4 +54,15 @@ public class DirectConsumer extends Defa
         endpoint.getConsumers().remove(this);
     }
 
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // deny stopping on shutdown as we want direct consumers to run in case some other queues
+        // depend on this consumer to run, so it can complete its exchanges
+        return true;
+    }
+
+    public int getPendingExchangesSize() {
+        // return 0 as we do not have an internal memory queue with a variable size
+        // of inflight messages. 
+        return 0;
+    }
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java?rev=928423&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java Sun Mar 28 15:00:56 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class ShutdownDirectEndpointTest extends ContextTestSupport {
+
+    public void testShutdownDirectEndpoint() throws Exception {
+        MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMessageCount(5);
+
+        // send the bodies async so we can continue
+        template.asyncSendBody("direct:bar", "A");
+        template.asyncSendBody("direct:bar", "B");
+        template.asyncSendBody("direct:bar", "C");
+        template.asyncSendBody("direct:bar", "D");
+        template.asyncSendBody("direct:bar", "E");
+
+        Thread.sleep(1000);
+
+        context.stop();
+
+        assertEquals("Should complete all messages", 5, bar.getReceivedCounter());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:bar")
+                    .delay(2000)
+                    .to("mock:bar");
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDirectEndpointTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java (from r928220, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java&r1=928220&r2=928423&rev=928423&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java Sun Mar 28 15:00:56 2010
@@ -25,20 +25,11 @@ import static org.apache.camel.ShutdownR
 /**
  * @version $Revision$
  */
-public class ShutdownDeferTest extends ContextTestSupport {
+public class ShutdownSedaAndDirectEndpointTest extends ContextTestSupport {
 
-    @Override
-    protected void setUp() throws Exception {
-        deleteDirectory("target/deferred");
-        super.setUp();
-    }
-
-    public void testShutdownDeferred() throws Exception {
-        // give it 20 seconds to shutdown
-        context.getShutdownStrategy().setTimeout(20);
-        
+    public void testShutdownSedaAndDirectEndpoint() throws Exception {
         MockEndpoint bar = getMockEndpoint("mock:bar");
-        bar.expectedMinimumMessageCount(1);
+        bar.expectedMessageCount(5);
 
         template.sendBody("seda:foo", "A");
         template.sendBody("seda:foo", "B");
@@ -46,33 +37,24 @@ public class ShutdownDeferTest extends C
         template.sendBody("seda:foo", "D");
         template.sendBody("seda:foo", "E");
 
-        assertMockEndpointsSatisfied();
-
         context.stop();
 
-        // should route about 4 - 5 (in some rare cases it will only route 4)
-        assertTrue("Should complete all messages, was " + bar.getReceivedCounter(), bar.getReceivedCounter() >= 4);
+        assertEquals("Should complete all messages", 5, bar.getReceivedCounter());
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override
-            // START SNIPPET: e1
             public void configure() throws Exception {
                 from("seda:foo")
                     .startupOrder(1)
-                    .delay(1000).to("file://target/deferred");
+                    .to("direct:bar");
 
-                // use file component to transfer files from route 1 -> route 2 as it
-                // will normally suspend, but by deferring this we can let route 1
-                // complete while shutting down
-                from("file://target/deferred")
-                    // defer shutting down this route as the 1st route depends upon it
-                    .startupOrder(2).shutdownRoute(Defer)
+                from("direct:bar")
+                    .delay(1000)
                     .to("mock:bar");
             }
-            // END SNIPPET: e1
         };
     }
-}
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownSedaAndDirectEndpointTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date