You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/12/22 12:14:25 UTC

svn commit: r893138 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/bean/

Author: davsclaus
Date: Tue Dec 22 11:14:16 2009
New Revision: 893138

URL: http://svn.apache.org/viewvc?rev=893138&view=rev
Log:
CAMEL-1483: Improved graceful shutdown to suspend consumers if possible. Also avoid shutting down shared producer pool as its should only be done by CamelContext itself.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue Dec 22 11:14:16 2009
@@ -142,7 +142,7 @@
     private PackageScanClassResolver packageScanClassResolver;
     // we use a capacity of 100 per endpoint, so for the same endpoint we have at most 100 producers in the pool
     // so if we have 6 endpoints in the pool, we have 6 x 100 producers in total
-    private ServicePool<Endpoint, Producer> producerServicePool = new DefaultProducerServicePool(100);
+    private ServicePool<Endpoint, Producer> producerServicePool = new SharedProducerServicePool(100);
     private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory();
     private Tracer defaultTracer;
     private InflightRepository inflightRepository = new DefaultInflightRepository();
@@ -1104,7 +1104,12 @@
         stopServices(components.values());
         components.clear();
 
-        stopServices(producerServicePool);
+        // special shutdown of a shared producer service pool as it should only be shutdown by camel context
+        if (producerServicePool instanceof SharedProducerServicePool) {
+            ((SharedProducerServicePool) producerServicePool).shutdown(this);
+        } else {
+            stopServices(producerServicePool);
+        }
         stopServices(inflightRepository);
 
         try {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumer.java Tue Dec 22 11:14:16 2009
@@ -41,7 +41,7 @@
     }
 
     protected void poll() throws Exception {
-        while (true) {
+        while (isPollAllowed()) {
             Exchange exchange = pollingConsumer.receiveNoWait();
             if (exchange == null) {
                 break;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java Tue Dec 22 11:14:16 2009
@@ -26,6 +26,7 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
+import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.util.EventHelper;
@@ -160,6 +161,30 @@
         }
     }
 
+    /**
+     * Suspends the consumer immediately.
+     *
+     * @param service the suspendable consumer
+     * @param consumer the consumer to suspend
+     */
+    protected void suspendNow(SuspendableService service, Consumer consumer) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Suspending: " + consumer);
+        }
+
+        try {
+            service.suspend();
+        } catch (Exception e) {
+            LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.");
+            // fire event
+            EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Suspend complete for: " + consumer);
+        }
+    }
+
     private ExecutorService getExecutorService() {
         if (executor == null) {
             executor = ExecutorServiceHelper.newSingleThreadExecutor("ShutdownTask", true);
@@ -204,12 +229,22 @@
             for (Consumer consumer : consumers) {
 
                 // some consumers do not support shutting down so let them decide
+                // if a consumer is suspendable then prefer to use that and then shutdown later
                 boolean shutdown = true;
+                boolean suspend = false;
                 if (consumer instanceof ShutdownAware) {
                     shutdown = ((ShutdownAware) consumer).deferShutdown();
+                } else if (consumer instanceof SuspendableService) {
+                    shutdown = false;
+                    suspend = true;
                 }
 
-                if (shutdown) {
+                if (suspend) {
+                    // only suspend it and then later shutdown it
+                    suspendNow((SuspendableService) consumer, consumer);
+                    // add it to the deferred list so the route will be shutdown later
+                    deferredConsumers.add(consumer);
+                } else if (shutdown) {
                     shutdownNow(consumer);
                 } else {
                     // we will stop it later, but for now it must run to be able to help all inflight messages

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=893138&r1=893137&r2=893138&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Tue Dec 22 11:14:16 2009
@@ -89,7 +89,7 @@
             try {
                 // eager assume we are done
                 done = true;
-                if (isRunAllowed() && !isSuspended()) {
+                if (isPollAllowed()) {
 
                     if (retryCounter == -1) {
                         if (LOG.isTraceEnabled()) {
@@ -125,6 +125,11 @@
 
     // Properties
     // -------------------------------------------------------------------------
+
+    protected boolean isPollAllowed() {
+        return isRunAllowed() && !isSuspended();
+    }
+
     public long getInitialDelay() {
         return initialDelay;
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java?rev=893138&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java Tue Dec 22 11:14:16 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.EventHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A shared {@link org.apache.camel.impl.DefaultProducerServicePool} which is used by
+ * {@link org.apache.camel.CamelContext} by default.
+ *
+ * @version $Revision$
+ */
+public class SharedProducerServicePool extends DefaultProducerServicePool {
+
+    private static final transient Log LOG = LogFactory.getLog(SharedProducerServicePool.class);
+
+    public SharedProducerServicePool(int capacity) {
+        super(capacity);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // only let CamelContext stop it since its shared and should
+        // only be stopped when CamelContext stops
+    }
+
+    void shutdown(CamelContext context) throws Exception {
+        try {
+            super.doStop();
+        } catch (Exception e) {
+            LOG.warn("Error occurred while stopping service: " + this + ". This exception will be ignored.");
+            // fire event
+            EventHelper.notifyServiceStopFailure(context, this, e);
+        }
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/SharedProducerServicePool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java?rev=893138&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java Tue Dec 22 11:14:16 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import javax.naming.Context;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BeanMethodHeartbeatTest;
+
+/**
+ * @version $Revision$
+ */
+public class BeanConsumerShutdownTest extends ContextTestSupport {
+
+    @Override
+    protected Context createJndiContext() throws Exception {
+        Context context = super.createJndiContext();
+        context.bind("service", new BeanMethodHeartbeatTest.MyService("service1"));
+        return context;
+    }
+
+    public void testHeartbeatsArrive() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMinimumMessageCount(1);
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("bean:service?method=status").to("seda:heartbeats");
+
+                from("seda:heartbeats").delay(2000).to("mock:result");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanConsumerShutdownTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date