You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/09/22 10:27:56 UTC
[5/8] camel git commit: CAMEL-9150: Seda suspend/resume should not
trigger start/stop logic
CAMEL-9150: Seda suspend/resume should not trigger start/stop logic
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e75d37b3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e75d37b3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e75d37b3
Branch: refs/heads/master
Commit: e75d37b3918eb957924e5122672accc6c01c72fc
Parents: 20ec49a
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 22 09:52:37 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 22 09:52:37 2015 +0200
----------------------------------------------------------------------
.../camel/component/direct/DirectConsumer.java | 2 +-
.../camel/component/seda/SedaConsumer.java | 9 ++++++++-
.../camel/impl/DefaultShutdownStrategy.java | 20 ++++++++++++--------
.../impl/ScheduledBatchPollingConsumer.java | 2 +-
.../camel/processor/RedeliveryErrorHandler.java | 2 +-
.../processor/aggregate/AggregateProcessor.java | 2 +-
.../org/apache/camel/spi/ShutdownPrepared.java | 5 +++--
...ntextSuspendResumeRouteStartupOrderTest.java | 4 ++++
...faultCamelContextSuspendResumeRouteTest.java | 6 ++++++
.../camel/impl/TwoRouteSuspendResumeTest.java | 3 +++
10 files changed, 40 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 83dbbca..a5be34f 100644
--- a/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -82,7 +82,7 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
return 0;
}
- public void prepareShutdown(boolean forced) {
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
// noop
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 14e4372..c0970fb 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -109,7 +109,14 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
}
@Override
- public void prepareShutdown(boolean forced) {
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // if we are suspending then we want to keep the thread running but just not route the exchange
+ // this logic is only when we stop or shutdown the consumer
+ if (suspendOnly) {
+ LOG.debug("Skip preparing to shutdown as consumer is being suspended");
+ return;
+ }
+
// signal we want to shutdown
shutdownPending = true;
forceShutdown = forced;
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
index 4d1395e..0657f15 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
@@ -184,7 +184,11 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
Collections.reverse(routesOrdered);
}
- LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
+ if (suspendOnly) {
+ LOG.info("Starting to graceful suspend " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
+ } else {
+ LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
+ }
// use another thread to perform the shutdowns so we can support timeout
timeoutOccurred.set(false);
@@ -230,7 +234,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
// now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
for (RouteStartupOrder order : routes) {
for (Service service : order.getServices()) {
- prepareShutdown(service, true, true, isSuppressLoggingOnTimeout());
+ prepareShutdown(service, false, true, true, isSuppressLoggingOnTimeout());
}
}
} else {
@@ -430,14 +434,14 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
}
/**
- * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
+ * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean, boolean)} method
* on the service if it implement this interface.
*
* @param service the service
* @param forced whether to force shutdown
* @param includeChildren whether to prepare the child of the service as well
*/
- private static void prepareShutdown(Service service, boolean forced, boolean includeChildren, boolean suppressLogging) {
+ private static void prepareShutdown(Service service, boolean suspendOnly, boolean forced, boolean includeChildren, boolean suppressLogging) {
Set<Service> list;
if (includeChildren) {
// include error handlers as we want to prepare them for shutdown as well
@@ -451,7 +455,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
if (child instanceof ShutdownPrepared) {
try {
LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
- ((ShutdownPrepared) child).prepareShutdown(forced);
+ ((ShutdownPrepared) child).prepareShutdown(suspendOnly, forced);
} catch (Exception e) {
if (suppressLogging) {
LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
@@ -580,7 +584,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
if (service instanceof Consumer) {
continue;
}
- prepareShutdown(service, false, true, false);
+ prepareShutdown(service, suspendOnly, false, true, false);
}
}
@@ -643,7 +647,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
- prepareShutdown(consumer, forced, false, suppress);
+ prepareShutdown(consumer, suspendOnly, forced, false, suppress);
LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
}
}
@@ -665,7 +669,7 @@ public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownS
for (Service service : order.getServices()) {
boolean forced = context.getShutdownStrategy().forceShutdown(service);
boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
- prepareShutdown(service, forced, true, suppress);
+ prepareShutdown(service, suspendOnly, forced, true, suppress);
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
index b16a5c4..a39263c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ScheduledBatchPollingConsumer.java
@@ -77,7 +77,7 @@ public abstract class ScheduledBatchPollingConsumer extends ScheduledPollConsume
}
@Override
- public void prepareShutdown(boolean forced) {
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
// reset task as the state of the task is not to be preserved
// which otherwise may cause isBatchAllowed() to return a wrong answer
this.shutdownRunningTask = null;
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
index d15d2a1..04784eb 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
@@ -323,7 +323,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
}
@Override
- public void prepareShutdown(boolean forced) {
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
// prepare for shutdown, eg do not allow redelivery if configured
log.trace("Prepare shutdown on error handler {}", this);
preparingShutdown = true;
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 4e3403c..5c400f6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1381,7 +1381,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
}
@Override
- public void prepareShutdown(boolean forced) {
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
// we are shutting down, so force completion if this option was enabled
// but only do this when forced=false, as that is when we have chance to
// send out new messages to be routed by Camel. When forced=true, then
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java b/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
index 2721d7b..b6f1dd9 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/ShutdownPrepared.java
@@ -42,8 +42,9 @@ public interface ShutdownPrepared {
* For forced shutdown, then the service is expected to aggressively shutdown any child services, such
* as thread pools etc. This is the last chance it has to perform such duties.
*
- * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown.
+ * @param suspendOnly <tt>true</tt> if the intention is to only suspend the service, and not stop/shutdown the service.
+ * @param forced <tt>true</tt> is forcing a more aggressive shutdown, <tt>false</tt> is for preparing to shutdown.
*/
- void prepareShutdown(boolean forced);
+ void prepareShutdown(boolean suspendOnly, boolean forced);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
index 9eea103..a8c4c1a 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteStartupOrderTest.java
@@ -41,6 +41,10 @@ public class DefaultCamelContextSuspendResumeRouteStartupOrderTest extends Conte
resetMocks();
mock.expectedMessageCount(0);
context.suspend();
+
+ // need to give seda consumer thread time to idle
+ Thread.sleep(500);
+
template.sendBody("seda:foo", "B");
mock.assertIsSatisfied(1000);
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
index fd4db6d..f7b6643 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
@@ -40,8 +40,14 @@ public class DefaultCamelContextSuspendResumeRouteTest extends ContextTestSuppor
// now suspend and dont expect a message to be routed
resetMocks();
mock.expectedMessageCount(0);
+
context.suspend();
+
+ // need to give seda consumer thread time to idle
+ Thread.sleep(500);
+
template.sendBody("seda:foo", "B");
+
mock.assertIsSatisfied(1000);
assertTrue(context.isSuspended());
http://git-wip-us.apache.org/repos/asf/camel/blob/e75d37b3/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java b/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
index 1c36f48..5f9f4ea 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
@@ -44,6 +44,9 @@ public class TwoRouteSuspendResumeTest extends ContextTestSupport {
context.suspendRoute("foo");
+ // need to give seda consumer thread time to idle
+ Thread.sleep(500);
+
template.sendBody("seda:foo", "B");
template.sendBody("direct:bar", "C");