You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/09/17 16:57:36 UTC
svn commit: r576463 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/converter/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/util/ ca...
Author: chirino
Date: Mon Sep 17 07:57:34 2007
New Revision: 576463
URL: http://svn.apache.org/viewvc?rev=576463&view=rev
Log:
Fixed several bugs in the Async interface implementations. Also got rid of the exchange.throwException() method
since it's not really needed.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRouteTest.java
activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java
activemq/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Sep 17 07:57:34 2007
@@ -136,11 +136,6 @@
void setException(Throwable e);
/**
- * Throws the exception associated with this exchange.
- */
- void throwException() throws Exception;
-
- /**
* Returns true if this exchange failed due to either an exception or fault
*
* @see Exchange#getException()
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java Mon Sep 17 07:57:34 2007
@@ -46,7 +46,7 @@
exchange.setException(e);
}
// false means processing of the exchange asynchronously,
- callback.done(false);
+ callback.done(true);
return true;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java Mon Sep 17 07:57:34 2007
@@ -36,7 +36,7 @@
return "Catch[" + exceptions + " -> " + getProcessor() + "]";
}
- public boolean catches(Exception e) {
+ public boolean catches(Throwable e) {
for (Class type : exceptions) {
if (type.isInstance(e)) {
return true;
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Mon Sep 17 07:57:34 2007
@@ -49,6 +49,7 @@
}
private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
+ private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName()+".FAILURE_HANDLED";
private Processor output;
private Processor deadLetter;
private AsyncProcessor outputAsync;
@@ -102,6 +103,7 @@
}
if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ setFailureHandled(exchange, true);
AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
return afp.process(exchange, callback);
}
@@ -129,8 +131,9 @@
// It is going to be processed async..
return false;
}
- if (exchange.getException() == null) {
+ if (exchange.getException() == null || isFailureHandled(exchange) ) {
// If everything went well.. then we exit here..
+ callback.done(true);
return true;
}
// error occured so loop back around.....
@@ -138,6 +141,15 @@
}
+ private boolean isFailureHandled(Exchange exchange) {
+ Boolean rc = exchange.getProperty(FAILURE_HANDLED_PROPERTY, Boolean.class);
+ return rc == null ? false : rc;
+ }
+
+ private void setFailureHandled(Exchange exchange, boolean b) {
+ exchange.setProperty(FAILURE_HANDLED_PROPERTY, b ? Boolean.TRUE : Boolean.FALSE );
+ }
+
public void process(Exchange exchange) throws Exception {
int redeliveryCounter = 0;
long redeliveryDelay = 0;
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Mon Sep 17 07:57:34 2007
@@ -22,6 +22,7 @@
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,71 +56,34 @@
return new Pipeline(processors);
}
- public void process(Exchange exchange) throws Exception {
- Exchange nextExchange = exchange;
- boolean first = true;
- for (Processor producer : getProcessors()) {
- // lets break out of the pipeline if we have a failure
- if (nextExchange.isFailed()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
- }
- break;
- }
- if (first) {
- first = false;
- } else {
- nextExchange = createNextExchange(producer, nextExchange);
- }
- producer.process(nextExchange);
- }
- ExchangeHelper.copyResults(exchange, nextExchange);
- }
- /**
- * It would be nice if we could implement the sync process method as follows.. but we
- * can't since the dead letter handler seem to like to handle the error but still
- * set the Exchange.exception field. When that happens this method throws that
- * exception but it seem that folks don't expect to get that exception.
- *
- * @param exchange
- * @throws Exception thx
- */
- public void xprocess(Exchange exchange) throws Exception {
- // This could become a base class method for an AsyncProcessor
- final CountDownLatch latch = new CountDownLatch(1);
- if (!process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- if (sync) {
- return;
- }
- latch.countDown();
- }
- })) {
- latch.await();
- }
- // If there was an exception associated with the exchange, throw it.
- exchange.throwException();
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
}
public boolean process(Exchange original, AsyncCallback callback) {
Iterator<Processor> processors = getProcessors().iterator();
Exchange nextExchange = original;
boolean first = true;
- while (processors.hasNext()) {
- AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-
+ while (true) {
if (nextExchange.isFailed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
}
break;
}
+ if (!processors.hasNext()) {
+ break;
+ }
+
+ AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+
if (first) {
first = false;
} else {
- nextExchange = createNextExchange(processor, original);
+ nextExchange = createNextExchange(processor, nextExchange);
}
+
boolean sync = process(original, nextExchange, callback, processors, processor);
// Continue processing the pipeline synchronously ...
if (!sync) {
@@ -140,30 +104,31 @@
public void done(boolean sync) {
// We only have to handle async completion of
- // the pipeline..
- if( sync ) {
+ // the pipeline..
+ if (sync) {
return;
}
- // Continue processing the pipeline...
+ // Continue processing the pipeline...
Exchange nextExchange = exchange;
- while( processors.hasNext() ) {
+ while (processors.hasNext()) {
AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
-
+
if (nextExchange.isFailed()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " + nextExchange.getFault(false));
+ LOG.debug("Mesage exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: "
+ + nextExchange.getFault(false));
}
break;
}
nextExchange = createNextExchange(processor, exchange);
- sync = process( original, nextExchange, callback, processors, processor);
- if( !sync ) {
+ sync = process(original, nextExchange, callback, processors, processor);
+ if (!sync) {
return;
}
}
-
+
ExchangeHelper.copyResults(original, nextExchange);
callback.done(true);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Mon Sep 17 07:57:34 2007
@@ -20,6 +20,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
@@ -49,25 +50,29 @@
}
public void process(Exchange exchange) throws Exception {
- boolean doneTry = false;
+ Throwable e = null;
try {
tryProcessor.process(exchange);
- doneTry = true;
-
- if (finallyProcessor != null) {
- finallyProcessor.process(exchange);
- }
- } catch (Exception e) {
- handleException(exchange, e);
-
- if (!doneTry && finallyProcessor != null) {
- try {
- finallyProcessor.process(exchange);
- } catch (Exception e2) {
- LOG.warn("Caught exception in finally block while handling other exception: " + e2, e2);
- }
+ e = exchange.getException();
+ } catch (Exception ex) {
+ e = ex;
+ }
+
+ if (e != null) {
+ try {
+ handleException(exchange, e);
+ } catch (Exception ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw new RuntimeCamelException(ex);
}
}
+
+ try {
+ finallyProcessor.process(exchange);
+ } catch (Exception e2) {
+ LOG.warn("Caught exception in finally block while handling other exception: " + e2, e2);
+ }
}
protected void doStart() throws Exception {
@@ -78,7 +83,7 @@
ServiceHelper.stopServices(tryProcessor, catchClauses, finallyProcessor);
}
- protected void handleException(Exchange exchange, Exception e) throws Exception {
+ protected void handleException(Exchange exchange, Throwable e) throws Throwable {
for (CatchProcessor catchClause : catchClauses) {
if (catchClause.catches(e)) {
// lets attach the exception to the exchange
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java?rev=576463&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java Mon Sep 17 07:57:34 2007
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+
+/**
+ * Helper methods for AsyncProcessor objects.
+ */
+public final class AsyncProcessorHelper {
+
+ private AsyncProcessorHelper() {
+ }
+
+ /**
+ * Calls the async version of the processor's process method and waits
+ * for it to complete before returning. This can be used by AsyncProcessor
+ * objects to implement their sync version of the process method.
+ *
+ * @param processor
+ * @param exchange
+ * @throws Exception
+ */
+ public static void process(AsyncProcessor processor, Exchange exchange) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ boolean sync = processor.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ if (!sync) {
+ latch.countDown();
+ }
+ }
+ });
+ if (!sync) {
+ latch.await();
+ }
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java Mon Sep 17 07:57:34 2007
@@ -16,6 +16,8 @@
*/
package org.apache.camel;
+import java.io.File;
+
import javax.naming.Context;
import org.apache.camel.builder.RouteBuilder;
@@ -245,5 +247,15 @@
protected void assertValidContext(CamelContext context) {
assertNotNull("No context found!", context);
+ }
+
+ protected static void recursiveDelete(File file) {
+ if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ recursiveDelete(files[i]);
+ }
+ }
+ file.delete();
}
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileRenameRouteTest.java Mon Sep 17 07:57:34 2007
@@ -16,12 +16,15 @@
*/
package org.apache.camel.component.file;
+import java.io.File;
+
/**
* @version $Revision: 1.1 $
*/
public class FileRenameRouteTest extends FileRouteTest {
@Override
protected void setUp() throws Exception {
+ recursiveDelete(new File("target/test-rename-inbox"));
uri = "file:target/test-rename-inbox?moveNamePrefix=foo/";
super.setUp();
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRouteTest.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRouteTest.java Mon Sep 17 07:57:34 2007
@@ -75,7 +75,7 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:in").beanRef("myBean").to("seda:out");
+ from("direct:in").beanRef("myBean");
}
};
}
Modified: activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java (original)
+++ activemq/camel/trunk/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfRouterTest.java Mon Sep 17 07:57:34 2007
@@ -32,6 +32,9 @@
import org.apache.cxf.frontend.ServerFactoryBean;
import org.apache.cxf.interceptor.LoggingInInterceptor;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.core.CollectionFactory;
+
import junit.framework.TestCase;
public class CxfRouterTest extends ContextTestSupport {
Modified: activemq/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=576463&r1=576462&r2=576463&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java (original)
+++ activemq/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Mon Sep 17 07:57:34 2007
@@ -102,7 +102,6 @@
}
});
latch.await();
- exchange.throwException();
}
public boolean process(final Exchange exchange, final AsyncCallback callback) {