You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/01 11:39:29 UTC
svn commit: r770599 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/component/direct/
main/java/org/apache/camel/component/file/
main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/
main/j...
Author: davsclaus
Date: Fri May 1 09:39:28 2009
New Revision: 770599
URL: http://svn.apache.org/viewvc?rev=770599&view=rev
Log:
CAMEL-1572: Internal API cleanup of AsyncProcessor.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Fri May 1 09:39:28 2009
@@ -19,17 +19,24 @@
/**
* The callback interface for an {@link AsyncProcessor} so that it can
- * notify you when an {@link Exchange} has completed.
+ * notify you when an {@link Exchange} has completed.
+ *
+ * @deprecated a new async API is planned for Camel 2.0
*/
public interface AsyncCallback {
/**
* This method is invoked once the Exchange is completed. If an error
* occurred while processing the exchange, the exception field of the
- * {@link Exchange} being processed will hold the error.
+ * {@link Exchange} being processed will hold the error.
+ * <p/>
+ * This callback reports back twice:
+ * - first time when the caller thread is done, that is the synchronously done.
+ * - second time when the asynchronously thread is done and thus the {@link Exchange} is really complete.
*
- * @param doneSynchronously set to true if the processing of the exchange was completed synchronously thread.
+ * @param doneSynchronously set to <tt>true</tt> if the processing of the exchange was completed in the
+ * synchronously thread. Is set to <tt>false</tt> when the asynchronously thread is complete.
*/
- void done(boolean doneSynchronously);
+ void done(boolean doneSynchronously);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Fri May 1 09:39:28 2009
@@ -26,6 +26,7 @@
* method.
*
* @version $Revision$
+ * @deprecated a new async API is planned for Camel 2.0
*/
public interface AsyncProcessor extends Processor {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java Fri May 1 09:39:28 2009
@@ -16,12 +16,10 @@
*/
package org.apache.camel.component.direct;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,7 +28,7 @@
*
* @version $Revision$
*/
-public class DirectProducer extends DefaultProducer implements AsyncProcessor {
+public class DirectProducer extends DefaultProducer implements Processor {
private static final transient Log LOG = LogFactory.getLog(DirectProducer.class);
private DirectEndpoint endpoint;
@@ -49,25 +47,4 @@
}
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
- int size = endpoint.getConsumers().size();
- if (size == 0) {
- LOG.warn("No consumers available on endpoint: " + endpoint + " to process " + exchange);
- } else if (size == 1) {
- DefaultConsumer consumer = endpoint.getConsumers().get(0);
- AsyncProcessor processor = AsyncProcessorTypeConverter.convert(consumer.getProcessor());
- return processor.process(exchange, callback);
- } else if (size > 1) {
- // Too hard to do multiple async.. do it sync
- try {
- for (DefaultConsumer consumer : endpoint.getConsumers()) {
- consumer.getProcessor().process(exchange);
- }
- } catch (Exception e) {
- exchange.setException(e);
- }
- }
- callback.done(true);
- return true;
- }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Fri May 1 09:39:28 2009
@@ -20,12 +20,10 @@
import java.util.Collections;
import java.util.List;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.processor.DeadLetterChannel;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -130,61 +128,67 @@
try {
final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
- if (processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile())) {
+ boolean begin = processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile());
+ if (!begin) {
+ log.warn(endpoint + " cannot process remote file: " + exchange.getGenericFile());
+ return;
+ }
- // must use file from exchange as it can be updated due the
- // preMoveNamePrefix/preMoveNamePostfix options
- final GenericFile<T> target = exchange.getGenericFile();
- // must use full name when downloading so we have the correct path
- final String name = target.getAbsoluteFilePath();
-
- // retrieve the file using the stream
- if (log.isTraceEnabled()) {
- log.trace("Retreiving file: " + name + " from: " + endpoint);
- }
+ // must use file from exchange as it can be updated due the
+ // preMoveNamePrefix/preMoveNamePostfix options
+ final GenericFile<T> target = exchange.getGenericFile();
+ // must use full name when downloading so we have the correct path
+ final String name = target.getAbsoluteFilePath();
+
+ // retrieve the file using the stream
+ if (log.isTraceEnabled()) {
+ log.trace("Retreiving file: " + name + " from: " + endpoint);
+ }
- operations.retrieveFile(name, exchange);
+ operations.retrieveFile(name, exchange);
- if (log.isTraceEnabled()) {
- log.trace("Retrieved file: " + name + " from: " + endpoint);
- }
+ if (log.isTraceEnabled()) {
+ log.trace("Retrieved file: " + name + " from: " + endpoint);
+ }
- if (log.isDebugEnabled()) {
- log.debug("About to process file: " + target + " using exchange: " + exchange);
- }
- // Use the async processor interface so that processing of
- // the exchange can happen asynchronously
- getAsyncProcessor().process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- final GenericFile<T> file = exchange.getGenericFile();
- boolean failed = exchange.isFailed();
-
- if (log.isDebugEnabled()) {
- log.debug("Done processing file: " + file + " using exchange: " + exchange);
- }
-
- boolean committed = false;
- try {
- if (!failed) {
- // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
- processStrategyCommit(processStrategy, exchange, file);
- committed = true;
- } else {
- if (exchange.getException() != null) {
- // if the failure was an exception then handle it
- handleException(exchange.getException());
- }
- }
- } finally {
- if (!committed) {
- processStrategyRollback(processStrategy, exchange, file);
- }
- }
+ if (log.isDebugEnabled()) {
+ log.debug("About to process file: " + target + " using exchange: " + exchange);
+ }
+ // Use the async processor interface so that processing of
+ // the exchange can happen asynchronously
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ // after processing
+ final GenericFile<T> file = exchange.getGenericFile();
+ boolean failed = exchange.isFailed();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Done processing file: " + file + " using exchange: " + exchange);
+ }
+
+ // commit or rollback
+ boolean committed = false;
+ try {
+ if (!failed) {
+ // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
+ processStrategyCommit(processStrategy, exchange, file);
+ committed = true;
+ } else {
+ if (exchange.getException() != null) {
+ // if the failure was an exception then handle it
+ handleException(exchange.getException());
}
- });
- } else {
- log.warn(endpoint + " cannot process remote file: " + exchange.getGenericFile());
+ }
+ } finally {
+ if (!committed) {
+ processStrategyRollback(processStrategy, exchange, file);
+ }
}
+
} catch (Exception e) {
handleException(e);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java Fri May 1 09:39:28 2009
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.file;
-import java.io.FileNotFoundException;
-
import org.apache.camel.Converter;
import org.apache.camel.Exchange;
import org.apache.camel.FallbackConverter;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Fri May 1 09:39:28 2009
@@ -42,7 +42,9 @@
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- queue.add(exchange.copy());
+ Exchange copy = exchange.copy();
+ copy.setProperty("CamelAsyncCallback", callback);
+ queue.add(copy);
callback.done(true);
return true;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri May 1 09:39:28 2009
@@ -67,13 +67,18 @@
if (exchange != null) {
if (isRunAllowed()) {
try {
- processor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- }
- });
+ processor.process(exchange);
} catch (Exception e) {
LOG.error("Seda queue caught: " + e, e);
}
+
+ // TODO: It ought to be UnitOfWork that did the callback notification but we are planning
+ // to replace it with a brand new Async API so we leave it as is
+ AsyncCallback callback = exchange.getProperty("CamelAsyncCallback", AsyncCallback.class);
+ if (callback != null) {
+ // seda consumer is async so invoke the async done on the callback if its provided
+ callback.done(false);
+ }
} else {
LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange);
try {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri May 1 09:39:28 2009
@@ -379,11 +379,8 @@
answer = createEndpoint(uri);
}
- // If it's a singleton then auto register it.
- // TODO: Why not test for isSingleton?
if (answer != null) {
addService(answer);
-
endpoints.put(getEndpointKey(uri, answer), answer);
lifecycleStrategy.onEndpointAdd(answer);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Fri May 1 09:39:28 2009
@@ -56,7 +56,8 @@
/**
* Provides an {@link AsyncProcessor} interface to the configured
* processor on the consumer. If the processor does not implement
- * the interface, it will be adapted so that it does.
+ * the interface, it will be adapted so that it does.
+ * @deprecated
*/
public AsyncProcessor getAsyncProcessor() {
if (asyncProcessor == null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Fri May 1 09:39:28 2009
@@ -21,14 +21,12 @@
import java.util.List;
import java.util.Map;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Intercept;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.FromDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
@@ -136,11 +134,8 @@
if (!eventDrivenProcessors.isEmpty()) {
Processor processor = Pipeline.newInstance(eventDrivenProcessors);
- // lets create the async processor
- final AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
-
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
- Processor unitOfWorkProcessor = new UnitOfWorkProcessor(asyncProcessor);
+ Processor unitOfWorkProcessor = new UnitOfWorkProcessor(processor);
// and create the route that wraps the UoW
Route edcr = new EventDrivenConsumerRoute(getEndpoint(), unitOfWorkProcessor);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Fri May 1 09:39:28 2009
@@ -20,7 +20,6 @@
import java.util.Collections;
import java.util.List;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Service;
import org.apache.camel.model.ProcessorDefinition;
@@ -38,7 +37,6 @@
private String id;
private List<Synchronization> synchronizations;
- private List<AsyncCallback> asyncCallbacks;
private List<ProcessorDefinition> routeList;
public DefaultUnitOfWork() {
@@ -53,9 +51,6 @@
if (synchronizations != null) {
synchronizations.clear();
}
- if (asyncCallbacks != null) {
- asyncCallbacks.clear();
- }
if (routeList != null) {
routeList.clear();
}
@@ -87,10 +82,6 @@
}
}
- public boolean isSynchronous() {
- return asyncCallbacks == null || asyncCallbacks.isEmpty();
- }
-
public String getId() {
if (id == null) {
id = DEFAULT_ID_GENERATOR.generateId();
@@ -116,27 +107,4 @@
return Collections.unmodifiableList(routeList);
}
- /**
- * Register some asynchronous processing step
- */
- /*
- public synchronized AsyncCallback addAsyncStep() {
- AsyncCallback answer = new AsyncCallback() {
- public void done(boolean doneSynchronously) {
- latch.countDown();
- }
- };
- if (latch == null) {
- latch = new CountDownLatch(1);
- }
- else {
- // TODO increment latch!
- }
- if (asyncCallbacks == null) {
- asyncCallbacks = new ArrayList<AsyncCallback>();
- }
- asyncCallbacks.add(answer);
- return answer;
- }
- */
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Fri May 1 09:39:28 2009
@@ -16,11 +16,8 @@
*/
package org.apache.camel.management;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,19 +27,19 @@
*
* @version $Revision$
*/
-public class InstrumentationProcessor extends DelegateProcessor implements AsyncProcessor {
+public class InstrumentationProcessor extends DelegateProcessor {
private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
private PerformanceCounter counter;
private String type;
+ public InstrumentationProcessor() {
+ }
+
public InstrumentationProcessor(PerformanceCounter counter) {
this.counter = counter;
}
- public InstrumentationProcessor() {
- }
-
@Override
public String toString() {
return "Instrumention" + (type != null ? ":" + type : "") + "[" + processor + "]";
@@ -53,42 +50,26 @@
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
+ if (processor != null) {
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- if (processor == null) {
- // no processor so nothing to process, so return
- callback.done(true);
- return true;
- }
-
- final long startTime = System.nanoTime();
-
- if (processor instanceof AsyncProcessor) {
- return ((AsyncProcessor)processor).process(exchange, new AsyncCallback() {
- public void done(boolean doneSynchronously) {
- if (counter != null) {
- // convert nanoseconds to milliseconds
- recordTime(exchange, (System.nanoTime() - startTime) / 1000000.0);
- }
- callback.done(doneSynchronously);
- }
- });
- }
-
- try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
+ // TODO: why not use millis instead of nano?
- if (counter != null) {
- // convert nanoseconds to milliseconds
- recordTime(exchange, (System.nanoTime() - startTime) / 1000000.0);
+ long startTime = 0;
+ if (counter != null) {
+ startTime = System.nanoTime();
+ }
+
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ if (counter != null) {
+ // convert nanoseconds to milliseconds
+ recordTime(exchange, (System.nanoTime() - startTime) / 1000000.0);
+ }
}
- callback.done(true);
- return true;
}
protected void recordTime(Exchange exchange, double duration) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java Fri May 1 09:39:28 2009
@@ -116,8 +116,7 @@
thread.setTaskQueue(taskQueue);
thread.setThreadGroup(threadGroup);
- // TODO: see if we can avoid creating so many nested pipelines
- ArrayList<Processor> pipe = new ArrayList<Processor>(2);
+ List<Processor> pipe = new ArrayList<Processor>(2);
pipe.add(thread);
pipe.add(createOutputsProcessor(routeContext, outputs));
return new Pipeline(pipe);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java Fri May 1 09:39:28 2009
@@ -17,13 +17,11 @@
package org.apache.camel.model.loadbalancer;
import java.util.List;
-
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.model.IdentifiedType;
@@ -131,18 +129,4 @@
loadBalancer.process(exchange);
}
- public boolean process(Exchange exchange, final AsyncCallback callback) {
- ObjectHelper.notNull(loadBalancer, "loadBalancer");
-
- return loadBalancer.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // Only handle the async case...
- if (!sync) {
- callback.done(sync);
- }
- }
- });
-
- }
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri May 1 09:39:28 2009
@@ -16,21 +16,15 @@
*/
package org.apache.camel.processor;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.RejectedExecutionException;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ServiceHelper;
@@ -43,16 +37,20 @@
*
* @version $Revision$
*/
-public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
+public class DeadLetterChannel extends ErrorHandlerSupport implements Processor {
- // TODO: The code in this class will be much easier when the AsyncProcessor is removed
+ // TODO: Introduce option to allow async redelivery, eg to not block thread while delay
+ // (eg the Timer task code). However we should consider using Channels that has internal
+ // producer/consumer queues with "delayed" support so a redelivery is just to move an
+ // exchange to this channel with the computed delay time
+ // we need to provide option so end users can deside if they would like to spawn an async thread
+ // or not. Also consider MEP as InOut does not work with async then as the original caller thread
+ // is expecting a reply in the sync thread.
// we can use a single shared static timer for async redeliveries
- private static final Timer REDELIVER_TIMER = new Timer("Camel DeadLetterChannel Redeliver Timer", true);
private final Processor deadLetter;
private final String deadLetterUri;
private final Processor output;
- private final AsyncProcessor outputAsync;
private final Processor redeliveryProcessor;
private RedeliveryPolicy redeliveryPolicy;
private Logger logger;
@@ -70,40 +68,6 @@
Processor onRedeliveryProcessor = redeliveryProcessor;
}
- private class RedeliverTimerTask extends TimerTask {
- private final Exchange exchange;
- private final AsyncCallback callback;
- private final RedeliveryData data;
-
- public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
- this.exchange = exchange;
- this.callback = callback;
- this.data = data;
- }
-
- @Override
- public void run() {
- //only handle the real AsyncProcess the exchange
- outputAsync.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // Only handle the async case...
- if (sync) {
- return;
- }
- data.sync = false;
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- if (exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
- // deliver to async to process it
- asyncProcess(exchange, callback, data);
- } else {
- callback.done(sync);
- }
- }
- });
- }
- }
-
/**
* Creates the dead letter channel.
*
@@ -121,7 +85,6 @@
this.deadLetter = deadLetter;
this.deadLetterUri = deadLetterUri;
this.redeliveryProcessor = redeliveryProcessor;
- this.outputAsync = AsyncProcessorTypeConverter.convert(output);
this.redeliveryPolicy = redeliveryPolicy;
this.logger = logger;
setExceptionPolicy(exceptionPolicyStrategy);
@@ -137,17 +100,13 @@
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- public boolean process(Exchange exchange, final AsyncCallback callback) {
- return processErrorHandler(exchange, callback, new RedeliveryData());
+ processErrorHandler(exchange, new RedeliveryData());
}
/**
* Processes the exchange decorated with this dead letter channel.
*/
- protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+ protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) {
while (true) {
// we can't keep retrying if the route is being shutdown.
@@ -158,8 +117,6 @@
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
- callback.done(data.sync);
- return data.sync;
}
// do not handle transacted exchanges that failed as this error handler does not support it
@@ -168,7 +125,7 @@
log.debug("This error handler does not support transacted exchanges."
+ " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
}
- return data.sync;
+ return;
}
// did previous processing caused an exception?
@@ -179,7 +136,9 @@
// compute if we should redeliver or not
boolean shouldRedeliver = shouldRedeliver(exchange, data);
if (!shouldRedeliver) {
- return deliverToFaultProcessor(exchange, callback, data);
+ deliverToFaultProcessor(exchange, data);
+ // we should not try redeliver so we are finished
+ return;
}
// if we are redelivering then sleep before trying again
@@ -196,84 +155,25 @@
}
// letting onRedeliver be executed
- deliverToRedeliveryProcessor(exchange, callback, data);
+ deliverToRedeliveryProcessor(exchange, data);
}
// process the exchange
- boolean sync = outputAsync.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // Only handle the async case...
- if (sync) {
- return;
- }
- data.sync = false;
- // only process if the exchange hasn't failed
- // and it has not been handled by the error processor
- if (exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
- //TODO Call the Timer for the asyncProcessor
- asyncProcess(exchange, callback, data);
- } else {
- callback.done(sync);
- }
- }
- });
- if (!sync) {
- // It is going to be processed async..
- return false;
- }
- if (exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange)) {
- // If everything went well.. then we exit here..
- callback.done(true);
- return true;
- }
- // error occurred so loop back around.....
- }
-
- }
-
- protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
- // set the timer here
- if (!isRunAllowed()) {
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
+ try {
+ output.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
}
- callback.done(data.sync);
- return;
- }
- // do not handle transacted exchanges that failed as this error handler does not support it
- if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) {
- if (log.isDebugEnabled()) {
- log.debug("This error handler does not support transacted exchanges."
- + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
+ if (done) {
+ return;
}
- return;
- }
-
- // did previous processing caused an exception?
- if (exchange.getException() != null) {
- handleException(exchange, data);
- }
-
- // compute if we should redeliver or not
- boolean shouldRedeliver = shouldRedeliver(exchange, data);
- if (!shouldRedeliver) {
- deliverToFaultProcessor(exchange, callback, data);
- return;
+ // error occurred so loop back around.....
}
-
- // process the next try
- // if we are redelivering then sleep before trying again
- if (data.redeliveryCounter > 0) {
- prepareExchangeForRedelivery(exchange);
-
- // wait until we should redeliver using a timer to avoid thread blocking
- data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
- REDELIVER_TIMER.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
- // letting onRedeliver be executed
- deliverToRedeliveryProcessor(exchange, callback, data);
- }
}
// Properties
@@ -369,8 +269,7 @@
* Gives an optional configure redelivery processor a chance to process before the Exchange
* will be redelivered. This can be used to alter the Exchange.
*/
- private void deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
- final RedeliveryData data) {
+ private void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
if (data.onRedeliveryProcessor == null) {
return;
}
@@ -379,41 +278,39 @@
log.trace("RedeliveryProcessor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered");
}
- AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.onRedeliveryProcessor);
- afp.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- log.trace("Redelivery processor done");
- // do NOT call done on callback as this is the redelivery processor that
- // is done. we should not mark the entire exchange as done.
- }
- });
+ try {
+ data.onRedeliveryProcessor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ log.trace("Redelivery processor done");
}
/**
* All redelivery attempts failed so move the exchange to the fault processor (eg the dead letter queue)
*/
- private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
- final RedeliveryData data) {
+ private void deliverToFaultProcessor(final Exchange exchange, final RedeliveryData data) {
+ if (data.failureProcessor == null) {
+ return;
+ }
+
// we did not success with the redelivery so now we let the failure processor handle it
ExchangeHelper.setFailureHandled(exchange);
// must decrement the redelivery counter as we didn't process the redelivery but is
// handling by the failure handler. So we must -1 to not let the counter be out-of-sync
decrementRedeliveryCounter(exchange);
- AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
- boolean sync = afp.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- log.trace("Fault processor done");
- prepareExchangeForFailure(exchange, data.handledPredicate);
- callback.done(data.sync);
- }
- });
+ try {
+ data.failureProcessor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ log.trace("Fault processor done");
+ prepareExchangeForFailure(exchange, data.handledPredicate);
String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
+ ". Handled by the failure processor: " + data.failureProcessor;
logFailedDelivery(false, exchange, msg, data, null);
-
- return sync;
}
private void prepareExchangeForFailure(Exchange exchange, Predicate handledPredicate) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri May 1 09:39:28 2009
@@ -19,8 +19,6 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Channel;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -28,7 +26,6 @@
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
/**
@@ -43,7 +40,7 @@
*
* @version $Revision$
*/
-public class DefaultChannel extends ServiceSupport implements AsyncProcessor, Channel {
+public class DefaultChannel extends ServiceSupport implements Processor, Channel {
private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
private Processor errorHandler;
@@ -141,24 +138,10 @@
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- public boolean process(Exchange exchange, AsyncCallback callback) {
Processor processor = getOutput();
-
- if (processor instanceof AsyncProcessor) {
- return ((AsyncProcessor) processor).process(exchange, callback);
- } else if (processor != null) {
- try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
+ if (processor != null) {
+ processor.process(exchange);
}
-
- callback.done(true);
- return true;
}
@Override
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Fri May 1 09:39:28 2009
@@ -16,15 +16,11 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.OnExceptionDefinition;
import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ServiceHelper;
@@ -34,11 +30,11 @@
*
* @version $Revision$
*/
-public class DefaultErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
- private AsyncProcessor output;
+public class DefaultErrorHandler extends ErrorHandlerSupport implements Processor {
+ private Processor output;
public DefaultErrorHandler(Processor output, ExceptionPolicyStrategy exceptionPolicyStrategy) {
- this.output = AsyncProcessorTypeConverter.convert(output);
+ this.output = output;
setExceptionPolicy(exceptionPolicyStrategy);
}
@@ -52,32 +48,28 @@
}
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
+ try {
+ output.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- return output.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
-
- // do not handle transacted exchanges as this error handler does not support it
- boolean handle = true;
- if (exchange.isTransacted() && !supportTransacted()) {
- handle = false;
- if (log.isDebugEnabled()) {
- log.debug("This error handler does not support transacted exchanges."
- + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
- }
- }
-
- if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
- handleException(exchange);
- }
- callback.done(sync);
+ // do not handle transacted exchanges as this error handler does not support it
+ boolean handle = true;
+ if (exchange.isTransacted() && !supportTransacted()) {
+ handle = false;
+ if (log.isDebugEnabled()) {
+ log.debug("This error handler does not support transacted exchanges."
+ + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
}
- });
+ }
+
+ if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
+ handleException(exchange);
+ }
}
- private void handleException(Exchange exchange) {
+ private void handleException(Exchange exchange) throws Exception {
Exception e = exchange.getException();
// store the original caused exception in a property, so we can restore it later
@@ -110,12 +102,8 @@
MessageHelper.resetStreamCache(exchange.getIn());
}
- private boolean deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) {
- AsyncProcessor afp = AsyncProcessorTypeConverter.convert(failureProcessor);
- return afp.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- }
- });
+ private void deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) throws Exception {
+ failureProcessor.process(exchange);
}
private void prepareExchangeAfterOnException(Exchange exchange, Predicate handledPredicate) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java Fri May 1 09:39:28 2009
@@ -31,6 +31,8 @@
/**
* A Delegate pattern which delegates processing to a nested AsyncProcessor which can
* be useful for implementation inheritance when writing an {@link org.apache.camel.spi.Policy}
+ *
+ * @deprecated
*/
public class DelegateAsyncProcessor extends ServiceSupport implements AsyncProcessor, Navigate {
protected AsyncProcessor processor;
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java Fri May 1 09:39:28 2009
@@ -16,12 +16,11 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
-public class ExchangePatternProcessor implements AsyncProcessor {
+public class ExchangePatternProcessor implements Processor {
private ExchangePattern exchangePattern = ExchangePattern.InOnly;
public ExchangePatternProcessor() {
@@ -39,10 +38,4 @@
exchange.setPattern(exchangePattern);
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
- exchange.setPattern(exchangePattern);
- callback.done(true);
- return true;
- }
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri May 1 09:39:28 2009
@@ -50,6 +50,8 @@
*/
public class MulticastProcessor extends ServiceSupport implements Processor, Navigate {
+ // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
+
static class ProcessorExchangePair {
private final Processor processor;
private final Exchange exchange;
@@ -163,7 +165,7 @@
exchanges.add(subExchange);
completedExchanges.increment();
ProcessCall call = new ProcessCall(subExchange, producer, new AsyncCallback() {
- public void done(boolean doneSynchronously) {
+ public void done(boolean sync) {
if (streaming && aggregationStrategy != null) {
doAggregate(result, subExchange);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Fri May 1 09:39:28 2009
@@ -40,6 +40,8 @@
public class Pipeline extends MulticastProcessor implements AsyncProcessor {
private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
+ // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
+
public Pipeline(Collection<Processor> processors) {
super(processors);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Fri May 1 09:39:28 2009
@@ -16,17 +16,14 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.Service;
import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,11 +32,10 @@
*
* @version $Revision$
*/
-public class SendProcessor extends ServiceSupport implements AsyncProcessor, Service {
+public class SendProcessor extends ServiceSupport implements Processor {
protected static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
protected Endpoint destination;
protected Producer producer;
- protected AsyncProcessor processor;
protected ExchangePattern pattern;
public SendProcessor(Endpoint destination) {
@@ -70,40 +66,17 @@
}
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
- if (producer == null) {
- if (isStopped()) {
- LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
- } else {
- exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
- }
- callback.done(true);
- return true;
- } else {
- exchange = configureExchange(exchange);
- return processor.process(exchange, callback);
- }
- }
-
public Endpoint getDestination() {
return destination;
}
protected void doStart() throws Exception {
this.producer = destination.createProducer();
- this.producer.start();
- this.processor = AsyncProcessorTypeConverter.convert(producer);
+ ServiceHelper.startService(this.producer);
}
protected void doStop() throws Exception {
- if (producer != null) {
- try {
- producer.stop();
- } finally {
- producer = null;
- processor = null;
- }
- }
+ ServiceHelper.stopService(this.producer);
}
protected Exchange configureExchange(Exchange exchange) {
@@ -113,8 +86,4 @@
return exchange;
}
- public Processor getProcessor() {
- return processor;
- }
-
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Fri May 1 09:39:28 2009
@@ -16,9 +16,8 @@
*/
package org.apache.camel.processor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultUnitOfWork;
import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
@@ -26,9 +25,9 @@
* Handles calling the UnitOfWork.done() method when processing of an exchange
* is complete.
*/
-public final class UnitOfWorkProcessor extends DelegateAsyncProcessor {
+public final class UnitOfWorkProcessor extends DelegateProcessor {
- public UnitOfWorkProcessor(AsyncProcessor processor) {
+ public UnitOfWorkProcessor(Processor processor) {
super(processor);
}
@@ -36,8 +35,9 @@
public String toString() {
return "UnitOfWork(" + processor + ")";
}
-
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
+
+ @Override
+ protected void processNext(Exchange exchange) throws Exception {
if (exchange.getUnitOfWork() == null) {
// If there is no existing UoW, then we should start one and
// terminate it once processing is completed for the exchange.
@@ -48,26 +48,26 @@
} catch (Exception e) {
throw wrapRuntimeCamelException(e);
}
- // return the process code where we do stop and cleanup
- return processor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // Order here matters. We need to complete the callbacks
- // since they will likely update the exchange with
- // some final results.
- callback.done(sync);
- exchange.getUnitOfWork().done(exchange);
- try {
- uow.stop();
- } catch (Exception e) {
- throw wrapRuntimeCamelException(e);
- }
- exchange.setUnitOfWork(null);
- }
- });
+
+ // process the exchange
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ // unit of work is done
+ exchange.getUnitOfWork().done(exchange);
+ try {
+ uow.stop();
+ } catch (Exception e) {
+ throw wrapRuntimeCamelException(e);
+ }
+ exchange.setUnitOfWork(null);
} else {
// There was an existing UoW, so we should just pass through..
// so that the guy the initiated the UoW can terminate it.
- return processor.process(exchange, callback);
+ processor.process(exchange);
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Fri May 1 09:39:28 2009
@@ -21,7 +21,6 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -96,39 +95,6 @@
}
}
- public boolean process(Exchange exchange, final AsyncCallback callback) {
- if (producer == null) {
- if (isStopped()) {
- LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
- } else {
- exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
- }
- callback.done(true);
- return true;
- } else {
- exchange = configureExchange(exchange);
-
- final Exchange wireTapExchange = configureExchange(exchange);
-
- // use submit instead of execute to force it to use a new thread, execute might
- // decide to use current thread, so we must submit a new task
- // as we dont care for the response we dont hold the future object and wait for the result
- getExecutorService().submit(new Callable<Object>() {
- public Object call() throws Exception {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Processing wiretap: " + wireTapExchange);
- }
- return processor.process(wireTapExchange, callback);
- }
- });
-
- // return true to indicate caller its okay, and he should not wait as this wiretap
- // is a fire and forget
- return true;
- }
- }
-
-
@Override
protected Exchange configureExchange(Exchange exchange) {
if (newExchangeProcessor == null && newExchangeExpression == null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java Fri May 1 09:39:28 2009
@@ -16,16 +16,12 @@
*/
package org.apache.camel.processor.interceptor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.util.AsyncProcessorHelper;
-public class HandleFaultInterceptor extends DelegateProcessor implements AsyncProcessor {
+public class HandleFaultInterceptor extends DelegateProcessor {
public HandleFaultInterceptor() {
super();
@@ -43,23 +39,8 @@
@Override
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
if (processor == null) {
- // no processor so nothing to process, so return
- callback.done(true);
- return true;
- }
-
- if (processor instanceof AsyncProcessor) {
- return ((AsyncProcessor) processor).process(exchange, new AsyncCallback() {
- public void done(boolean doneSynchronously) {
- handleFault(exchange);
- callback.done(doneSynchronously);
- }
- });
+ return;
}
try {
@@ -67,10 +48,8 @@
} catch (Exception e) {
exchange.setException(e);
}
- handleFault(exchange);
- callback.done(true);
- return true;
+ handleFault(exchange);
}
/**
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Fri May 1 09:39:28 2009
@@ -16,19 +16,16 @@
*/
package org.apache.camel.processor.interceptor;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.StreamCache;
import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.MessageHelper;
/**
* {@link DelegateProcessor} that converts a message into a re-readable format
*/
-public class StreamCachingInterceptor extends DelegateProcessor implements AsyncProcessor {
+public class StreamCachingInterceptor extends DelegateProcessor {
public StreamCachingInterceptor() {
super();
@@ -46,31 +43,13 @@
@Override
public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- public boolean process(Exchange exchange, AsyncCallback callback) {
StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
if (newBody != null) {
exchange.getIn().setBody(newBody);
}
MessageHelper.resetStreamCache(exchange.getIn());
- return proceed(exchange, callback);
+ getProcessor().process(exchange);
}
- public boolean proceed(Exchange exchange, AsyncCallback callback) {
- if (getProcessor() instanceof AsyncProcessor) {
- return ((AsyncProcessor) getProcessor()).process(exchange, callback);
- } else {
- try {
- getProcessor().process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- // false means processing of the exchange asynchronously,
- callback.done(true);
- return true;
- }
- }
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Fri May 1 09:39:28 2009
@@ -18,8 +18,6 @@
import java.util.List;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.util.ObjectHelper;
@@ -83,13 +81,7 @@
}
}
- public boolean process(Exchange exchange, final AsyncCallback callback) {
- return processExchange(0, exchange, callback);
- }
-
- public boolean processExchange(final int index, final Exchange exchange, final AsyncCallback callback) {
- boolean sync;
-
+ protected void processExchange(final int index, final Exchange exchange) {
List<Processor> list = getProcessors();
if (list.isEmpty()) {
throw new IllegalStateException("No processors available to process " + exchange);
@@ -99,33 +91,18 @@
if (processor == null) {
throw new IllegalStateException("No processors could be chosen to process " + exchange);
}
- if (processor instanceof AsyncProcessor) {
- AsyncProcessor asyncProcessor = (AsyncProcessor) processor;
- sync = asyncProcessor.process(exchange, new AsyncCallback() {
- public void done(boolean doSync) {
- // check the exchange and call the FailOverProcessor
- if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
- exchange.setException(null);
- processExchange(index + 1, exchange, callback);
- } else {
- callback.done(doSync);
- }
- }
- });
- } else {
- try {
- processor.process(exchange);
- } catch (Exception ex) {
- exchange.setException(ex);
- }
- if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
- exchange.setException(null);
- processExchange(index + 1, exchange, callback);
- }
- sync = true;
- callback.done(true);
+
+ try {
+ processor.process(exchange);
+ } catch (Exception ex) {
+ exchange.setException(ex);
+ }
+
+ if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
+ exchange.setException(null);
+ processExchange(index + 1, exchange);
}
- return sync;
+
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java Fri May 1 09:39:28 2009
@@ -18,7 +18,6 @@
import java.util.List;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Processor;
/**
@@ -26,7 +25,8 @@
*
* @version $Revision$
*/
-public interface LoadBalancer extends AsyncProcessor {
+public interface LoadBalancer extends Processor {
+
/**
* Adds a new processor to the load balancer
*
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java Fri May 1 09:39:28 2009
@@ -18,8 +18,6 @@
import java.util.List;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -43,39 +41,6 @@
processor.process(exchange);
}
}
-
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- boolean sync = false;
- List<Processor> list = getProcessors();
- if (list.isEmpty()) {
- throw new IllegalStateException("No processors available to process " + exchange);
- }
- Processor processor = chooseProcessor(list, exchange);
- if (processor == null) {
- throw new IllegalStateException("No processors could be chosen to process " + exchange);
- } else {
- if (processor instanceof AsyncProcessor) {
- AsyncProcessor asyncProcessor = (AsyncProcessor)processor;
- sync = asyncProcessor.process(exchange, new AsyncCallback() {
- public void done(boolean sync) {
- // Only handle the async case...
- if (!sync) {
- callback.done(sync);
- }
- }
- });
- } else {
- try {
- processor.process(exchange);
- } catch (Exception ex) {
- exchange.setException(ex);
- }
- callback.done(false);
- }
- }
- return sync;
-
- }
protected abstract Processor chooseProcessor(List<Processor> processors, Exchange exchange);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java Fri May 1 09:39:28 2009
@@ -18,7 +18,6 @@
import java.util.List;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -52,17 +51,4 @@
return exchange.copy();
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
- List<Processor> list = getProcessors();
- for (Processor processor : list) {
- Exchange copy = copyExchangeStrategy(processor, exchange);
- try {
- processor.process(copy);
- } catch (Exception ex) {
- // We don't handle the exception here
- }
- }
- callback.done(false);
- return false;
- }
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java Fri May 1 09:39:28 2009
@@ -61,9 +61,7 @@
// the first is when we have finished sending to the seda producer
if (sync) {
doneSync = true;
- }
- // and the async should occur when the mock endpoint is done
- if (!sync) {
+ } else {
latchAsync.countDown();
}
}
@@ -77,14 +75,14 @@
assertEquals("Send should occur before processor", "sendprocess", route);
assertTrue("Sync done should have occured", doneSync);
- // TODO: The AsyncProcessor does not work as expected
// wait at most 2 seconds
boolean zero = latchAsync.await(2, TimeUnit.SECONDS);
- // assertTrue("Async done should have occured", zero);
+ assertTrue("Async done should have occured", zero);
// how to get the response?
String response = exchange.getOut().getBody(String.class);
- // assertEquals("Bye World", response);
+ // TODO: we need a new API for getting the result
+ //assertEquals("Bye World", response);
}
@Override
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java Fri May 1 09:39:28 2009
@@ -76,7 +76,7 @@
for (int i = 0; i < ITERS; i++) {
template.send(e, new SendingProcessor(i), new AsyncCallback() {
- public void done(boolean arg0) {
+ public void done(boolean sync) {
// Do nothing here
}
});
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Fri May 1 09:39:28 2009
@@ -87,7 +87,6 @@
final Exchange exchanges[] = new Exchange[exchangeCount];
for (int i = 0; i < exchangeCount; i++) {
- final int index = i;
// Send the exchange using the async completion interface.
// This call returns before the exchange is completed.
exchanges[i] = template.send("direct:a", new Processor() {
@@ -98,7 +97,6 @@
}
}, new AsyncCallback() {
public void done(boolean doneSynchronously) {
- log.debug("Completed: " + index + ", exception: " + exchanges[index].getException());
completedExchanges.countDown();
}
});
@@ -142,9 +140,11 @@
return new RouteBuilder() {
public void configure() {
// START SNIPPET: example
- from("direct:a").thread(1).process(new Processor() {
+ from("direct:a").to("seda:async");
+
+ from("seda:async").thread(1).process(new Processor() {
public void process(Exchange exchange) throws Exception {
- continueProcessing.await();
+ continueProcessing.await(10, TimeUnit.SECONDS);
}
}).to("mock:result");
// END SNIPPET: example