You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/22 10:27:56 UTC

[5/8] camel git commit: CAMEL-9150: Seda suspend/resume should not trigger start/stop logic

CAMEL-9150: Seda suspend/resume should not trigger start/stop logic


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e75d37b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e75d37b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e75d37b3

Branch: refs/heads/master
Commit: e75d37b3918eb957924e5122672accc6c01c72fc
Parents: 20ec49a
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 22 09:52:37 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 22 09:52:37 2015 +0200

----------------------------------------------------------------------
 .../camel/component/direct/DirectConsumer.java  |  2 +-
 .../camel/component/seda/SedaConsumer.java      |  9 ++++++++-
 .../camel/impl/DefaultShutdownStrategy.java     | 20 ++++++++++++--------
 .../impl/ScheduledBatchPollingConsumer.java     |  2 +-
 .../camel/processor/RedeliveryErrorHandler.java |  2 +-
 .../processor/aggregate/AggregateProcessor.java |  2 +-
 .../org/apache/camel/spi/ShutdownPrepared.java  |  5 +++--
 ...ntextSuspendResumeRouteStartupOrderTest.java |  4 ++++
 ...faultCamelContextSuspendResumeRouteTest.java |  6 ++++++
 .../camel/impl/TwoRouteSuspendResumeTest.java   |  3 +++
 10 files changed, 40 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 83dbbca..a5be34f 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -82,7 +82,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
         return 0;
     }
 
-    public void prepareShutdown(boolean forced) {
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
         // noop
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 14e4372..c0970fb 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -109,7 +109,14 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
     }
 
     @Override
-    public void prepareShutdown(boolean forced) {
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // if we are suspending then we want to keep the thread running but just not route the exchange
+        // this logic is only when we stop or shutdown the consumer
+        if (suspendOnly) {
+            LOG.debug("Skip preparing to shutdown as consumer is being suspended");
+            return;
+        }
+
         // signal we want to shutdown
         shutdownPending = true;
         forceShutdown = forced;

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
index 4d1395e..0657f15 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
@@ -184,7 +184,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
             Collections.reverse(routesOrdered);
         }
 
-        LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
+        if (suspendOnly) {
+            LOG.info("Starting to graceful suspend " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
+        } else {
+            LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
+        }
 
         // use another thread to perform the shutdowns so we can support timeout
         timeoutOccurred.set(false);
@@ -230,7 +234,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
                     // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
                     for (RouteStartupOrder order : routes) {
                         for (Service service : order.getServices()) {
-                            prepareShutdown(service, true, true, isSuppressLoggingOnTimeout());
+                            prepareShutdown(service, false, true, true, isSuppressLoggingOnTimeout());
                         }
                     }
                 } else {
@@ -430,14 +434,14 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
     }
 
     /**
-     * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
+     * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean, boolean)} method
      * on the service if it implement this interface.
      *
      * @param service the service
      * @param forced  whether to force shutdown
      * @param includeChildren whether to prepare the child of the service as well
      */
-    private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) {
+    private static void prepareShutdown(Service service, boolean suspendOnly, boolean forced, boolean includeChildren, boolean suppressLogging) {
         Set<Service> list;
         if (includeChildren) {
             // include error handlers as we want to prepare them for shutdown as well
@@ -451,7 +455,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
             if (child instanceof ShutdownPrepared) {
                 try {
                     LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
-                    ((ShutdownPrepared) child).prepareShutdown(forced);
+                    ((ShutdownPrepared) child).prepareShutdown(suspendOnly, forced);
                 } catch (Exception e) {
                     if (suppressLogging) {
                         LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
@@ -580,7 +584,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
                     if (service instanceof Consumer) {
                         continue;
                     }
-                    prepareShutdown(service, false, true, false);
+                    prepareShutdown(service, suspendOnly, false, true, false);
                 }
             }
 
@@ -643,7 +647,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
                     LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
                     boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
                     boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
-                    prepareShutdown(consumer, forced, false, suppress);
+                    prepareShutdown(consumer, suspendOnly, forced, false, suppress);
                     LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
                 }
             }
@@ -665,7 +669,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
                 for (Service service : order.getServices()) {
                     boolean forced = context.getShutdownStrategy().forceShutdown(service);
                     boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
-                    prepareShutdown(service, forced, true, suppress);
+                    prepareShutdown(service, suspendOnly, forced, true, suppress);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
index b16a5c4..a39263c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
@@ -77,7 +77,7 @@ public abstract class ScheduledBatchPollingConsumer extends ScheduledPollConsume
     }
 
     @Override
-    public void prepareShutdown(boolean forced) {
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
         // reset task as the state of the task is not to be preserved
         // which otherwise may cause isBatchAllowed() to return a wrong answer
         this.shutdownRunningTask = null;

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index d15d2a1..04784eb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -323,7 +323,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
     }
 
     @Override
-    public void prepareShutdown(boolean forced) {
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
         // prepare for shutdown, eg do not allow redelivery if configured
         log.trace("Prepare shutdown on error handler {}", this);
         preparingShutdown = true;

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 4e3403c..5c400f6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1381,7 +1381,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     @Override
-    public void prepareShutdown(boolean forced) {
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
         // we are shutting down, so force completion if this option was enabled
         // but only do this when forced=false, as that is when we have chance to
         // send out new messages to be routed by Camel. When forced=true, then

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java b/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
index 2721d7b..b6f1dd9 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
@@ -42,8 +42,9 @@ public interface ShutdownPrepared {
      * For forced shutdown, then the service is expected to aggressively shutdown any child services, such
      * as thread pools etc. This is the last chance it has to perform such duties.
      * 
-     * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown. 
+     * @param suspendOnly <tt>true</tt> if the intention is to only suspend the service, and not stop/shutdown the service.
+     * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown.
      */
-    void prepareShutdown(boolean forced);
+    void prepareShutdown(boolean suspendOnly, boolean forced);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
index 9eea103..a8c4c1a 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
@@ -41,6 +41,10 @@ public class DefaultCamelContextSuspendResumeRouteStartupOrderTest extends Conte
         resetMocks();
         mock.expectedMessageCount(0);
         context.suspend();
+
+        // need to give seda consumer thread time to idle
+        Thread.sleep(500);
+
         template.sendBody("seda:foo", "B");
         mock.assertIsSatisfied(1000);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
index fd4db6d..f7b6643 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
@@ -40,8 +40,14 @@ public class DefaultCamelContextSuspendResumeRouteTest extends ContextTestSuppor
         // now suspend and dont expect a message to be routed
         resetMocks();
         mock.expectedMessageCount(0);
+
         context.suspend();
+
+        // need to give seda consumer thread time to idle
+        Thread.sleep(500);
+
         template.sendBody("seda:foo", "B");
+
         mock.assertIsSatisfied(1000);
 
         assertTrue(context.isSuspended());

http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java b/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
index 1c36f48..5f9f4ea 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
@@ -44,6 +44,9 @@ public class TwoRouteSuspendResumeTest extends ContextTestSupport {
 
         context.suspendRoute("foo");
 
+        // need to give seda consumer thread time to idle
+        Thread.sleep(500);
+
         template.sendBody("seda:foo", "B");
         template.sendBody("direct:bar", "C");