You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/12/22 12:14:25 UTC
svn commit: r893138 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/bean/
Author: davsclaus
Date: Tue Dec 22 11:14:16 2009
New Revision: 893138
URL: http://svn.apache.org/viewvc?rev=893138&view=rev
Log:
CAMEL-1483: Improved graceful shutdown to suspend consumers if possible. Also avoid shutting down shared producer pool as its should only be done by CamelContext itself.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue Dec 22 11:14:16 2009
@@ -142,7 +142,7 @@
private PackageScanClassResolver packageScanClassResolver;
// we use a capacity of 100 per endpoint, so for the same endpoint we have at most 100 producers in the pool
// so if we have 6 endpoints in the pool, we have 6 x 100 producers in total
- private ServicePool<Endpoint, Producer> producerServicePool = new DefaultProducerServicePool(100);
+ private ServicePool<Endpoint, Producer> producerServicePool = new SharedProducerServicePool(100);
private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory();
private Tracer defaultTracer;
private InflightRepository inflightRepository = new DefaultInflightRepository();
@@ -1104,7 +1104,12 @@
stopServices(components.values());
components.clear();
- stopServices(producerServicePool);
+ // special shutdown of a shared producer service pool as it should only be shutdown by camel context
+ if (producerServicePool instanceof SharedProducerServicePool) {
+ ((SharedProducerServicePool) producerServicePool).shutdown(this);
+ } else {
+ stopServices(producerServicePool);
+ }
stopServices(inflightRepository);
try {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java Tue Dec 22 11:14:16 2009
@@ -41,7 +41,7 @@
}
protected void poll() throws Exception {
- while (true) {
+ while (isPollAllowed()) {
Exchange exchange = pollingConsumer.receiveNoWait();
if (exchange == null) {
break;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Tue Dec 22 11:14:16 2009
@@ -26,6 +26,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
+import org.apache.camel.SuspendableService;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.ShutdownStrategy;
import org.apache.camel.util.EventHelper;
@@ -160,6 +161,30 @@
}
}
+ /**
+ * Suspends the consumer immediately.
+ *
+ * @param service the suspendable consumer
+ * @param consumer the consumer to suspend
+ */
+ protected void suspendNow(SuspendableService service, Consumer consumer) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Suspending: " + consumer);
+ }
+
+ try {
+ service.suspend();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.");
+ // fire event
+ EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Suspend complete for: " + consumer);
+ }
+ }
+
private ExecutorService getExecutorService() {
if (executor == null) {
executor = ExecutorServiceHelper.newSingleThreadExecutor("ShutdownTask", true);
@@ -204,12 +229,22 @@
for (Consumer consumer : consumers) {
// some consumers do not support shutting down so let them decide
+ // if a consumer is suspendable then prefer to use that and then shutdown later
boolean shutdown = true;
+ boolean suspend = false;
if (consumer instanceof ShutdownAware) {
shutdown = ((ShutdownAware) consumer).deferShutdown();
+ } else if (consumer instanceof SuspendableService) {
+ shutdown = false;
+ suspend = true;
}
- if (shutdown) {
+ if (suspend) {
+ // only suspend it and then later shutdown it
+ suspendNow((SuspendableService) consumer, consumer);
+ // add it to the deferred list so the route will be shutdown later
+ deferredConsumers.add(consumer);
+ } else if (shutdown) {
shutdownNow(consumer);
} else {
// we will stop it later, but for now it must run to be able to help all inflight messages
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Dec 22 11:14:16 2009
@@ -89,7 +89,7 @@
try {
// eager assume we are done
done = true;
- if (isRunAllowed() && !isSuspended()) {
+ if (isPollAllowed()) {
if (retryCounter == -1) {
if (LOG.isTraceEnabled()) {
@@ -125,6 +125,11 @@
// Properties
// -------------------------------------------------------------------------
+
+ protected boolean isPollAllowed() {
+ return isRunAllowed() && !isSuspended();
+ }
+
public long getInitialDelay() {
return initialDelay;
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java?rev=893138&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java Tue Dec 22 11:14:16 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.EventHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A shared {@link org.apache.camel.impl.DefaultProducerServicePool} which is used by
+ * {@link org.apache.camel.CamelContext} by default.
+ *
+ * @version $Revision$
+ */
+public class SharedProducerServicePool extends DefaultProducerServicePool {
+
+ private static final transient Log LOG = LogFactory.getLog(SharedProducerServicePool.class);
+
+ public SharedProducerServicePool(int capacity) {
+ super(capacity);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // only let CamelContext stop it since its shared and should
+ // only be stopped when CamelContext stops
+ }
+
+ void shutdown(CamelContext context) throws Exception {
+ try {
+ super.doStop();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while stopping service: " + this + ". This exception will be ignored.");
+ // fire event
+ EventHelper.notifyServiceStopFailure(context, this, e);
+ }
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java?rev=893138&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java Tue Dec 22 11:14:16 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import javax.naming.Context;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BeanMethodHeartbeatTest;
+
+/**
+ * @version $Revision$
+ */
+public class BeanConsumerShutdownTest extends ContextTestSupport {
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ Context context = super.createJndiContext();
+ context.bind("service", new BeanMethodHeartbeatTest.MyService("service1"));
+ return context;
+ }
+
+ public void testHeartbeatsArrive() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMinimumMessageCount(1);
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("bean:service?method=status").to("seda:heartbeats");
+
+ from("seda:heartbeats").delay(2000).to("mock:result");
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date