You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/09/10 10:27:04 UTC
svn commit: r813303 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/processor/
components/camel-spring/src/main/java/org/apache/camel/spring/
components/camel-spring/src/test/java/org/apache/...
Author: davsclaus
Date: Thu Sep 10 08:27:04 2009
New Revision: 813303
URL: http://svn.apache.org/viewvc?rev=813303&view=rev
Log:
CAMEL-1963: Fixed race condition when stopping camel that was most apparent when using camel-jms with jms listeners. Fixed Camel main app to thrown exception if startup failed for some reason.
Added:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java (with props)
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml
- copied, changed from r812993, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Thu Sep 10 08:27:04 2009
@@ -26,6 +26,7 @@
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.Context;
import org.apache.camel.CamelContext;
@@ -95,6 +96,7 @@
private boolean routeDefinitionInitiated;
private String name;
private final Map<String, Endpoint> endpoints = new LRUCache<String, Endpoint>(1000);
+ private final AtomicInteger endpointKeyCounter = new AtomicInteger();
private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>();
private final Map<String, Component> components = new HashMap<String, Component>();
private List<Route> routes;
@@ -1192,17 +1194,12 @@
disableJMX = true;
}
- protected synchronized String getEndpointKey(String uri, Endpoint endpoint) {
+ protected String getEndpointKey(String uri, Endpoint endpoint) {
if (endpoint.isSingleton()) {
return uri;
} else {
- // lets try find the first endpoint key which is free
- for (int counter = 0; true; counter++) {
- String key = (counter > 0) ? uri + ":" + counter : uri;
- if (!endpoints.containsKey(key)) {
- return key;
- }
- }
+ int counter = endpointKeyCounter.incrementAndGet();
+ return uri + ":" + counter;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/MainSupport.java Thu Sep 10 08:27:04 2009
@@ -107,10 +107,12 @@
/**
* Runs this process with the given arguments
*/
- public void run() {
+ public void run() throws Exception {
if (!completed.get()) {
+ // if we have an issue starting the propagate exception to caller
+ start();
try {
- start();
+ // while running then just log errors
waitUntilCompleted();
stop();
} catch (Exception e) {
@@ -265,7 +267,7 @@
/**
* Parses the command line arguments then runs the program
*/
- public void run(String[] args) {
+ public void run(String[] args) throws Exception {
parseArguments(args);
run();
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ServiceSupport.java Thu Sep 10 08:27:04 2009
@@ -152,7 +152,7 @@
*
* @return true if the service should continue to run.
*/
- protected boolean isRunAllowed() {
+ public boolean isRunAllowed() {
return !(stopping.get() || stopped.get());
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Thu Sep 10 08:27:04 2009
@@ -106,7 +106,7 @@
// logging nextExchange as it contains the exchange that might have altered the payload and since
// we are logging the completion if will be confusing if we log the original instead
// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
- LOG.trace("Processing compelete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
+ LOG.trace("Processing complete for exchangeId: " + exchange.getExchangeId() + " >>> " + nextExchange);
}
// copy results back to the original exchange
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Thu Sep 10 08:27:04 2009
@@ -18,11 +18,13 @@
import java.util.concurrent.RejectedExecutionException;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
@@ -94,15 +96,9 @@
*/
protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) throws Exception {
while (true) {
- // we can't keep retrying if the route is being shutdown.
- if (!isRunAllowed()) {
- if (log.isDebugEnabled()) {
- log.debug("Rejected execution as we are not started for exchange: " + exchange);
- }
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
- return;
- }
+ // we can't keep retrying if the route is being shutdown
+ if (!isRunAllowed(exchange)) {
+ return;
}
// do not handle transacted exchanges that failed as this error handler does not support it
@@ -152,6 +148,11 @@
continue;
}
+ // do a sanity check whether we are still allowed to run before continuing as we just woke up
+ if (!isRunAllowed(exchange)) {
+ return;
+ }
+
// letting onRedeliver be executed
deliverToRedeliveryProcessor(exchange, data);
}
@@ -173,6 +174,45 @@
}
/**
+ * Strategy the detect if we are allowed to run
+ * <p/>
+ * This implementation detects if Camel is shutting down as well
+ *
+ * @param exchange the exchange
+ * @return <tt>true</tt> if we can process the exchange, <tt>false</tt> to stop processing it
+ */
+ protected boolean isRunAllowed(Exchange exchange) {
+ // check if camel is stopping
+ boolean stoppingCamel = false;
+ CamelContext context = exchange.getContext();
+ if (context instanceof ServiceSupport) {
+ stoppingCamel = !((ServiceSupport) context).isRunAllowed();
+ }
+
+ if (stoppingCamel || !isRunAllowed()) {
+
+ if (log.isDebugEnabled()) {
+ boolean stopping = isStopping() || isStopped();
+ if (stopping) {
+ log.debug("Rejected execution as we are stopping for exchange: " + exchange);
+ } else {
+ log.debug("Rejected execution as we are not started for exchange: " + exchange);
+ }
+ }
+
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+
+ // and stop continue routing
+ exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Strategy whether the exchange has an exception that we should try to handle.
* <p/>
* Standard implementations should just look for an exception.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Thu Sep 10 08:27:04 2009
@@ -143,7 +143,7 @@
* @param redeliveryDelay previous redelivery delay
* @param redeliveryCounter number of previous redelivery attempts
* @return the calculate delay
- * @throws InterruptedException is thrown if the sleep is interruped likely because of shutdown
+ * @throws InterruptedException is thrown if the sleep is interrupted likely because of shutdown
*/
public long sleep(long redeliveryDelay, int redeliveryCounter) throws InterruptedException {
redeliveryDelay = calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java?rev=813303&r1=813302&r2=813303&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java (original)
+++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/Main.java Thu Sep 10 08:27:04 2009
@@ -81,7 +81,7 @@
@Override
public void run() {
- log.info("Recieved hang up - stopping the main instance.");
+ log.info("Received hang up - stopping the main instance.");
try {
mainInstance.stop();
} catch (Exception ex) {
@@ -90,7 +90,7 @@
}
}
- public static void main(String... args) {
+ public static void main(String... args) throws Exception {
Main main = new Main();
instance = main;
main.enableHangupSupport();
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java?rev=813303&view=auto
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java (added)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java Thu Sep 10 08:27:04 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.issues;
+
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.TestSupport;
+import org.apache.camel.spring.Main;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class SpringMainStartFailedIssueTest extends TestSupport {
+
+ @Test
+ public void testStartupFailed() throws Exception {
+ Main main = new Main();
+
+ String[] args = new String[]{"-ac", "org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml"};
+ try {
+ main.run(args);
+ fail("Should have thrown an exception");
+ } catch (ResolveEndpointFailedException e) {
+ // expected
+ }
+
+ assertNull("Spring application context should NOT be created", main.getApplicationContext());
+ }
+}
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml (from r812993, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml&r1=812993&r2=813303&rev=813303&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/stringDataFormatTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/SpringMainStartFailedIssueTest.xml Thu Sep 10 08:27:04 2009
@@ -22,26 +22,12 @@
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <bean id="xs" class="org.apache.camel.model.dataformat.StringDataFormat">
- <property name="charset" value="utf-8"/>
- </bean>
-
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
- <from uri="direct:marshal"/>
- <!-- using a bean id -->
- <marshal ref="xs"/>
- <to uri="mock:marshal"/>
+ <from uri="xxx:unknown"/>
+ <to uri="mock:foo"/>
</route>
- <route>
- <from uri="direct:unmarshal"/>
- <!-- using a child node -->
- <unmarshal>
- <string charset="UTF-8"/>
- </unmarshal>
- <to uri="mock:unmarshal"/>
- </route>
</camelContext>
</beans>