You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/09/10 10:27:04 UTC

svn commit: r813303 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ components/camel-spring/src/main/java/org/apache/camel/spring/ components/camel-spring/src/test/java/org/apache/...

Author: davsclaus
Date: Thu Sep 10 08:27:04 2009
New Revision: 813303

URL: http://svn.apache.org/viewvc?rev=813303&view=rev
Log:
CAMEL-1963: Fixed race condition when stopping camel that was most apparent when using camel-jms with jms listeners. Fixed Camel main app to thrown exception if startup failed for some reason.

Added:
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java   (with props)
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml
      - copied, changed from r812993, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
    camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Thu Sep 10 08:27:04 2009
@@ -26,6 +26,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.naming.Context;
 
 import org.apache.camel.CamelContext;
@@ -95,6 +96,7 @@
     private boolean routeDefinitionInitiated;
     private String name;  
     private final Map<String, Endpoint> endpoints = new LRUCache<String, Endpoint>(1000);
+    private final AtomicInteger endpointKeyCounter = new AtomicInteger();
     private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>();
     private final Map<String, Component> components = new HashMap<String, Component>();
     private List<Route> routes;
@@ -1192,17 +1194,12 @@
         disableJMX = true;
     }
 
-    protected synchronized String getEndpointKey(String uri, Endpoint endpoint) {
+    protected String getEndpointKey(String uri, Endpoint endpoint) {
         if (endpoint.isSingleton()) {
             return uri;
         } else {
-            // lets try find the first endpoint key which is free
-            for (int counter = 0; true; counter++) {
-                String key = (counter > 0) ? uri + ":" + counter : uri;
-                if (!endpoints.containsKey(key)) {
-                    return key;
-                }
-            }
+            int counter = endpointKeyCounter.incrementAndGet();
+            return uri + ":" + counter;
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java Thu Sep 10 08:27:04 2009
@@ -107,10 +107,12 @@
     /**
      * Runs this process with the given arguments
      */
-    public void run() {
+    public void run() throws Exception {
         if (!completed.get()) {
+            // if we have an issue starting the propagate exception to caller
+            start();
             try {
-                start();
+                // while running then just log errors
                 waitUntilCompleted();
                 stop();
             } catch (Exception e) {
@@ -265,7 +267,7 @@
     /**
      * Parses the command line arguments then runs the program
      */
-    public void run(String[] args) {
+    public void run(String[] args) throws Exception {
         parseArguments(args);
         run();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Thu Sep 10 08:27:04 2009
@@ -152,7 +152,7 @@
      *
      * @return true if the service should continue to run.
      */
-    protected boolean isRunAllowed() {
+    public boolean isRunAllowed() {
         return !(stopping.get() || stopped.get());
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Thu Sep 10 08:27:04 2009
@@ -106,7 +106,7 @@
             // logging nextExchange as it contains the exchange that might have altered the payload and since
             // we are logging the completion if will be confusing if we log the original instead
             // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
-            LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
+            LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
         }
 
         // copy results back to the original exchange

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Thu Sep 10 08:27:04 2009
@@ -18,11 +18,13 @@
 
 import java.util.concurrent.RejectedExecutionException;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -94,15 +96,9 @@
      */
     protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) throws Exception {
         while (true) {
-            // we can't keep retrying if the route is being shutdown.
-            if (!isRunAllowed()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Rejected execution as we are not started for exchange: " + exchange);
-                }
-                if (exchange.getException() == null) {
-                    exchange.setException(new RejectedExecutionException());
-                    return;
-                }
+            // we can't keep retrying if the route is being shutdown
+            if (!isRunAllowed(exchange)) {
+                return;
             }
 
             // do not handle transacted exchanges that failed as this error handler does not support it
@@ -152,6 +148,11 @@
                     continue;
                 }
 
+                // do a sanity check whether we are still allowed to run before continuing as we just woke up
+                if (!isRunAllowed(exchange)) {
+                    return;
+                }
+
                 // letting onRedeliver be executed
                 deliverToRedeliveryProcessor(exchange, data);
             }
@@ -173,6 +174,45 @@
     }
 
     /**
+     * Strategy the detect if we are allowed to run
+     * <p/>
+     * This implementation detects if Camel is shutting down as well
+     *
+     * @param exchange  the exchange
+     * @return <tt>true</tt> if we can process the exchange, <tt>false</tt> to stop processing it
+     */
+    protected boolean isRunAllowed(Exchange exchange) {
+        // check if camel is stopping
+        boolean stoppingCamel = false;
+        CamelContext context = exchange.getContext();
+        if (context instanceof ServiceSupport) {
+            stoppingCamel = !((ServiceSupport) context).isRunAllowed();
+        }
+
+        if (stoppingCamel || !isRunAllowed()) {
+
+            if (log.isDebugEnabled()) {
+                boolean stopping = isStopping() || isStopped();
+                if (stopping) {
+                    log.debug("Rejected execution as we are stopping for exchange: " + exchange);
+                } else {
+                    log.debug("Rejected execution as we are not started for exchange: " + exchange);
+                }
+            }
+
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+
+            // and stop continue routing
+            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
      * Strategy whether the exchange has an exception that we should try to handle.
      * <p/>
      * Standard implementations should just look for an exception.

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Thu Sep 10 08:27:04 2009
@@ -143,7 +143,7 @@
      * @param redeliveryDelay  previous redelivery delay
      * @param redeliveryCounter  number of previous redelivery attempts
      * @return the calculate delay
-     * @throws InterruptedException is thrown if the sleep is interruped likely because of shutdown
+     * @throws InterruptedException is thrown if the sleep is interrupted likely because of shutdown
      */
     public long sleep(long redeliveryDelay, int redeliveryCounter) throws InterruptedException {
         redeliveryDelay = calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);

Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java Thu Sep 10 08:27:04 2009
@@ -81,7 +81,7 @@
 
         @Override
         public void run() {
-            log.info("Recieved hang up - stopping the main instance.");
+            log.info("Received hang up - stopping the main instance.");
             try {
                 mainInstance.stop();
             } catch (Exception ex) {
@@ -90,7 +90,7 @@
         }
     }
     
-    public static void main(String... args) {
+    public static void main(String... args) throws Exception {
         Main main = new Main();
         instance = main;
         main.enableHangupSupport();

Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java?rev=813303&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java Thu Sep 10 08:27:04 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.issues;
+
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.TestSupport;
+import org.apache.camel.spring.Main;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class SpringMainStartFailedIssueTest extends TestSupport {
+
+    @Test
+    public void testStartupFailed() throws Exception {
+        Main main = new Main();
+
+        String[] args = new String[]{"-ac", "org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml"};
+        try {
+            main.run(args);
+            fail("Should have thrown an exception");
+        } catch (ResolveEndpointFailedException e) {
+            // expected
+        }
+
+        assertNull("Spring application context should NOT be created", main.getApplicationContext());
+    }
+}

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml (from r812993, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml&r1=812993&r2=813303&rev=813303&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml Thu Sep 10 08:27:04 2009
@@ -22,26 +22,12 @@
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
 
-    <bean id="xs" class="org.apache.camel.model.dataformat.StringDataFormat">
-        <property name="charset" value="utf-8"/>
-    </bean>
-
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
         <route>
-            <from uri="direct:marshal"/>
-            <!-- using a bean id -->
-            <marshal ref="xs"/>
-            <to uri="mock:marshal"/>
+            <from uri="xxx:unknown"/>
+            <to uri="mock:foo"/>
         </route>
 
-        <route>
-            <from uri="direct:unmarshal"/>
-            <!-- using a child node -->
-            <unmarshal>
-                <string charset="UTF-8"/>
-            </unmarshal>
-            <to uri="mock:unmarshal"/>
-        </route>
     </camelContext>
 
 </beans>