You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/09/18 15:52:08 UTC
svn commit: r576920 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/component/direct/
main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/
main/java/org/apache/camel/util/ test/java/o...
Author: chirino
Date: Tue Sep 18 06:52:07 2007
New Revision: 576920
URL: http://svn.apache.org/viewvc?rev=576920&view=rev
Log:
- Added support for a thread processor: http://issues.apache.org/activemq/browse/CAMEL-149
- Better Async handling in the DeadLetterChannel and TryProcessor
- Added async support in the direct: endpoint and in the CamelTemplate
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (with props)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java?rev=576920&r1=576919&r2=576920&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java Tue Sep 18 06:52:07 2007
@@ -73,6 +73,19 @@
/**
* Sends an exchange to an endpoint using a supplied
+ * @{link Processor} to populate the exchange. The callback
+ * will be called when the exchange is completed.
+ *
+ * @param endpointUri the endpoint URI to send the exchange to
+ * @param processor the transformer used to populate the new exchange
+ */
+ public E send(String endpointUri, Processor processor, AsyncCallback callback) {
+ Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
+ return send(endpoint, processor, callback);
+ }
+
+ /**
+ * Sends an exchange to an endpoint using a supplied
*
* @{link Processor} to populate the exchange
*
@@ -108,6 +121,18 @@
*/
public E send(Endpoint<E> endpoint, Processor processor) {
return producerCache.send(endpoint, processor);
+ }
+
+ /**
+ * Sends an exchange to an endpoint using a supplied
+ * @{link Processor} to populate the exchange. The callback
+ * will be called when the exchange is completed.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param processor the transformer used to populate the new exchange
+ */
+ public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) {
+ return producerCache.send(endpoint, processor, callback);
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java?rev=576920&r1=576919&r2=576920&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java Tue Sep 18 06:52:07 2007
@@ -16,9 +16,13 @@
*/
package org.apache.camel.component.direct;
+import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -27,6 +31,7 @@
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,6 +42,48 @@
* @version $Revision: 519973 $
*/
public class DirectEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
+
+ private final class DirectProducer extends DefaultProducer implements AsyncProcessor {
+ private DirectProducer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ if (consumers.isEmpty()) {
+ LOG.warn("No consumers available on " + this + " for " + exchange);
+ } else {
+ for (DefaultConsumer<E> consumer : consumers) {
+ consumer.getProcessor().process(exchange);
+ }
+ }
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ int size = consumers.size();
+ if (size == 0) {
+ LOG.warn("No consumers available on " + this + " for " + exchange);
+ } else {
+ if (size > 1) {
+ // Too hard to do multiple async.. do it sync
+ try {
+ for (DefaultConsumer<E> consumer : consumers) {
+ consumer.getProcessor().process(exchange);
+ }
+ } catch (Throwable error) {
+ exchange.setException(error);
+ }
+ } else {
+ for (DefaultConsumer<E> consumer : consumers) {
+ AsyncProcessor processor = AsyncProcessorTypeConverter.convert(consumer.getProcessor());
+ return processor.process(exchange, callback);
+ }
+ }
+ }
+ callback.done(true);
+ return true;
+ }
+ }
+
private static final Log LOG = LogFactory.getLog(DirectEndpoint.class);
boolean allowMultipleConsumers = true;
@@ -47,21 +94,7 @@
}
public Producer createProducer() throws Exception {
- return new DefaultProducer(this) {
- public void process(Exchange exchange) throws Exception {
- DirectEndpoint.this.process(exchange);
- }
- };
- }
-
- protected void process(Exchange exchange) throws Exception {
- if (consumers.isEmpty()) {
- LOG.warn("No consumers available on " + this + " for " + exchange);
- } else {
- for (DefaultConsumer<E> consumer : consumers) {
- consumer.getProcessor().process(exchange);
- }
- }
+ return new DirectProducer(this);
}
public Consumer<E> createConsumer(Processor processor) throws Exception {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=576920&r1=576919&r2=576920&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Tue Sep 18 06:52:07 2007
@@ -16,6 +16,15 @@
*/
package org.apache.camel.model;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
@@ -43,13 +52,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.xml.bind.annotation.XmlAttribute;
-import javax.xml.bind.annotation.XmlTransient;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
/**
* @version $Revision: 1.1 $
*/
@@ -848,5 +850,33 @@
}
}
return processor;
+ }
+
+ /**
+ * Causes subsequent processors to be called asynchronously
+ *
+ * @param coreSize the number of threads that will be used to process
+ * messages in subsequent processors.
+ * @return a ThreadType builder that can be used to futher configure the
+ * the thread pool.
+ */
+ public ThreadType thread(int coreSize) {
+ ThreadType answer = new ThreadType(coreSize);
+ addOutput(answer);
+ return answer;
+ }
+
+ /**
+ * Causes subsequent processors to be called asynchronously
+ *
+ * @param executor the executor that will be used to process
+ * messages in subsequent processors.
+ * @return a ThreadType builder that can be used to further configure the
+ * the thread pool.
+ */
+ public ProcessorType<Type> thread(ThreadPoolExecutor executor) {
+ ThreadType answer = new ThreadType(executor);
+ addOutput(answer);
+ return this;
}
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java?rev=576920&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java Tue Sep 18 06:52:07 2007
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.RouteContext;
+import org.apache.camel.processor.Pipeline;
+import org.apache.camel.processor.ThreadProcessor;
+
+/**
+ * Represents an XML <thread/> element
+ * @version $Revision$
+ */
+@XmlRootElement(name = "thread")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ThreadType extends ProcessorType {
+
+ @XmlAttribute
+ private int coreSize = 1;
+ @XmlAttribute
+ private boolean daemon = true;
+ @XmlAttribute
+ private long keepAliveTime;
+ @XmlAttribute
+ private int maxSize = 1;
+ @XmlAttribute
+ private String name = "Thread Processor";
+ @XmlAttribute
+ private int priority = Thread.NORM_PRIORITY;
+ @XmlAttribute
+ private long stackSize;
+ @XmlElementRef
+ private List<ProcessorType> outputs = new ArrayList<ProcessorType>();
+
+ @XmlTransient
+ private BlockingQueue<Runnable> taskQueue;
+ @XmlTransient
+ private ThreadGroup threadGroup;
+ @XmlTransient
+ private ThreadPoolExecutor executor;
+
+ public ThreadType() {
+ }
+
+ public ThreadType(int coreSize) {
+ this.coreSize = coreSize;
+ this.maxSize = coreSize;
+ }
+
+ public ThreadType(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ }
+
+ @Override
+ public List getInterceptors() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public List getOutputs() {
+ return outputs;
+ }
+
+ @Override
+ public String toString() {
+ return "Thread[" + getLabel() + "]";
+ }
+
+ @Override
+ public String getLabel() {
+ return "coreSize="+coreSize;
+ }
+
+ @Override
+ public Processor createProcessor(RouteContext routeContext) throws Exception {
+
+ ThreadProcessor thread = new ThreadProcessor();
+ thread.setExecutor(executor);
+ thread.setCoreSize(coreSize);
+ thread.setDaemon(daemon);
+ thread.setKeepAliveTime(keepAliveTime);
+ thread.setMaxSize(maxSize);
+ thread.setName(name);
+ thread.setPriority(priority);
+ thread.setStackSize(stackSize);
+ thread.setTaskQueue(taskQueue);
+ thread.setThreadGroup(threadGroup);
+
+ // TODO: see if we can avoid creating so many nested pipelines
+
+ ArrayList<Processor> pipe = new ArrayList<Processor>(2);
+ pipe.add(thread);
+ pipe.add(createOutputsProcessor(routeContext, outputs));
+ return new Pipeline(pipe);
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ //
+ // Fluent Methods
+ //
+ ///////////////////////////////////////////////////////////////////
+ public ThreadType coreSize(int coreSize) {
+ setCoreSize(coreSize);
+ return this;
+ }
+
+ public ThreadType daemon(boolean daemon) {
+ setDaemon(daemon);
+ return this;
+ }
+
+ public ThreadType keepAliveTime(long keepAliveTime) {
+ setKeepAliveTime(keepAliveTime);
+ return this;
+ }
+
+ public ThreadType maxSize(int maxSize) {
+ setMaxSize(maxSize);
+ return this;
+ }
+
+ public ThreadType name(String name) {
+ setName(name);
+ return this;
+ }
+
+ public ThreadType priority(int priority) {
+ setPriority(priority);
+ return this;
+ }
+
+ public ThreadType stackSize(long stackSize) {
+ setStackSize(stackSize);
+ return this;
+ }
+
+ public ThreadType taskQueue(BlockingQueue<Runnable> taskQueue) {
+ setTaskQueue(taskQueue);
+ return this;
+ }
+
+ public ThreadType threadGroup(ThreadGroup threadGroup) {
+ setThreadGroup(threadGroup);
+ return this;
+ }
+
+ public ThreadType executor(ThreadPoolExecutor executor) {
+ setExecutor(executor);
+ return this;
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ //
+ // Property Accessors
+ //
+ ///////////////////////////////////////////////////////////////////
+
+ public void setCoreSize(int coreSize) {
+ this.coreSize = coreSize;
+ }
+
+ public void setDaemon(boolean daemon) {
+ this.daemon = daemon;
+ }
+
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public void setMaxSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public void setStackSize(long stackSize) {
+ this.stackSize = stackSize;
+ }
+
+ public void setTaskQueue(BlockingQueue<Runnable> taskQueue) {
+ this.taskQueue = taskQueue;
+ }
+
+ public void setThreadGroup(ThreadGroup threadGroup) {
+ this.threadGroup = threadGroup;
+ }
+
+ public ThreadPoolExecutor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ }
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadType.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=576920&r1=576919&r2=576920&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Tue Sep 18 06:52:07 2007
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.RejectedExecutionException;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
@@ -23,6 +25,7 @@
import org.apache.camel.Processor;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.ExceptionType;
+import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +45,7 @@
private class RedeliveryData {
int redeliveryCounter;
long redeliveryDelay;
+ boolean sync = true;
// default behaviour which can be overloaded on a per exception basis
RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
@@ -85,6 +89,16 @@
public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
while (true) {
+
+ // We can't keep retrying if the route is being shutdown.
+ if (isStopping() || isStopped()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(data.sync);
+ return data.sync;
+ }
+
if (exchange.getException() != null) {
Throwable e = exchange.getException();
exchange.setException(null); // Reset it since we are handling it.
@@ -105,7 +119,11 @@
if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
setFailureHandled(exchange, true);
AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
- return afp.process(exchange, callback);
+ return afp.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ callback.done(data.sync);
+ }
+ });
}
if (data.redeliveryCounter > 0) {
@@ -113,13 +131,15 @@
data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
sleep(data.redeliveryDelay);
}
-
+
+ exchange.setException(null);
boolean sync = outputAsync.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
// Only handle the async case...
if (sync) {
return;
}
+ data.sync = false;
if (exchange.getException() != null) {
process(exchange, callback, data);
} else {
@@ -131,7 +151,7 @@
// It is going to be processed async..
return false;
}
- if (exchange.getException() == null || isFailureHandled(exchange) ) {
+ if (exchange.getException() == null || isFailureHandled(exchange)) {
// If everything went well.. then we exit here..
callback.done(true);
return true;
@@ -141,50 +161,17 @@
}
- private boolean isFailureHandled(Exchange exchange) {
+ public static boolean isFailureHandled(Exchange exchange) {
Boolean rc = exchange.getProperty(FAILURE_HANDLED_PROPERTY, Boolean.class);
return rc == null ? false : rc;
}
- private void setFailureHandled(Exchange exchange, boolean b) {
+ public static void setFailureHandled(Exchange exchange, boolean b) {
exchange.setProperty(FAILURE_HANDLED_PROPERTY, b ? Boolean.TRUE : Boolean.FALSE );
}
public void process(Exchange exchange) throws Exception {
- int redeliveryCounter = 0;
- long redeliveryDelay = 0;
-
- // default behaviour which can be overloaded on a per exception basis
- RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
- Processor failureProcessor = deadLetter;
-
- do {
- if (redeliveryCounter > 0) {
- // Figure out how long we should wait to resend this message.
- redeliveryDelay = currentRedeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
- sleep(redeliveryDelay);
- }
-
- try {
- output.process(exchange);
- return;
- } catch (Throwable e) {
- logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
- redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-
- ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
- if (exceptionPolicy != null) {
- currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(currentRedeliveryPolicy);
- Processor processor = exceptionPolicy.getErrorHandler();
- if (processor != null) {
- failureProcessor = processor;
- }
- }
- }
- } while (currentRedeliveryPolicy.shouldRedeliver(redeliveryCounter));
-
- // now lets send to the dead letter queue
- failureProcessor.process(exchange);
+ AsyncProcessorHelper.process(this, exchange);
}
// Properties
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java?rev=576920&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java Tue Sep 18 06:52:07 2007
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * A processor that forces async processing of the exchange using a thread pool.
+ *
+ * @version $Revision$
+ */
+public class ThreadProcessor implements AsyncProcessor, Service {
+
+ private ThreadPoolExecutor executor;
+ private long stackSize;
+ private ThreadGroup threadGroup;
+ private int priority = Thread.NORM_PRIORITY;
+ private boolean daemon = true;
+ private String name = "Thread Processor";
+ private BlockingQueue<Runnable> taskQueue;
+ private long keepAliveTime;
+ private int maxSize = 1;
+ private int coreSize = 1;
+ private final AtomicBoolean shutdown = new AtomicBoolean(true);;
+
+ class ProcessCall implements Runnable {
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ public ProcessCall(Exchange exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ }
+
+ public void run() {
+ if( shutdown.get() ) {
+ exchange.setException(new RejectedExecutionException());
+ callback.done(false);
+ } else {
+ callback.done(false);
+ }
+ }
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ public boolean process(final Exchange exchange, final AsyncCallback callback) {
+ ProcessCall call = new ProcessCall(exchange, callback);
+ executor.execute(call);
+ return false;
+ }
+
+ public void start() throws Exception {
+ shutdown.set(false);
+ getExecutor().setRejectedExecutionHandler(new RejectedExecutionHandler() {
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+ ProcessCall call = (ProcessCall)runnable;
+ call.exchange.setException(new RejectedExecutionException());
+ call.callback.done(false);
+ }
+ });
+ }
+
+ public void stop() throws Exception {
+ shutdown.set(true);
+ executor.shutdown();
+ executor.awaitTermination(0, TimeUnit.SECONDS);
+ }
+
+ public long getStackSize() {
+ return stackSize;
+ }
+
+ public void setStackSize(long stackSize) {
+ this.stackSize = stackSize;
+ }
+
+ public ThreadGroup getThreadGroup() {
+ return threadGroup;
+ }
+
+ public void setThreadGroup(ThreadGroup threadGroup) {
+ this.threadGroup = threadGroup;
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public void setPriority(int priority) {
+ this.priority = priority;
+ }
+
+ public boolean isDaemon() {
+ return daemon;
+ }
+
+ public void setDaemon(boolean daemon) {
+ this.daemon = daemon;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public void setKeepAliveTime(long keepAliveTime) {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+ public void setMaxSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ public int getCoreSize() {
+ return coreSize;
+ }
+
+ public void setCoreSize(int coreSize) {
+ this.coreSize = coreSize;
+ }
+
+ public BlockingQueue<Runnable> getTaskQueue() {
+ if (taskQueue == null) {
+ taskQueue = new ArrayBlockingQueue<Runnable>(1000);
+ }
+ return taskQueue;
+ }
+
+ public void setTaskQueue(BlockingQueue<Runnable> taskQueue) {
+ this.taskQueue = taskQueue;
+ }
+
+ public ThreadPoolExecutor getExecutor() {
+ if (executor == null) {
+ executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(), getKeepAliveTime(), TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() {
+ public Thread newThread(Runnable runnable) {
+ Thread thread;
+ if (getStackSize() > 0) {
+ thread = new Thread(getThreadGroup(), runnable, getName(), getStackSize());
+ } else {
+ thread = new Thread(getThreadGroup(), runnable, getName());
+ }
+ thread.setDaemon(isDaemon());
+ thread.setPriority(getPriority());
+ return thread;
+ }
+ });
+ }
+ return executor;
+ }
+
+ public void setExecutor(ThreadPoolExecutor executor) {
+ this.executor = executor;
+ }
+
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadProcessor.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java?rev=576920&r1=576919&r2=576920&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java Tue Sep 18 06:52:07 2007
@@ -54,12 +54,19 @@
try {
tryProcessor.process(exchange);
e = exchange.getException();
+
+ // Ignore it if it was handled by the dead letter channel.
+ if (e != null && DeadLetterChannel.isFailureHandled(exchange)) {
+ e = null;
+ }
} catch (Exception ex) {
e = ex;
+ exchange.setException(e);
}
if (e != null) {
try {
+ DeadLetterChannel.setFailureHandled(exchange, true);
handleException(exchange, e);
} catch (Exception ex) {
throw ex;
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java?rev=576920&r1=576919&r2=576920&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java Tue Sep 18 06:52:07 2007
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateProducerException;
@@ -27,6 +28,7 @@
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,6 +89,35 @@
/**
* Sends an exchange to an endpoint using a supplied
+ * @{link Processor} to populate the exchange. The callback
+ * will be called when the exchange is completed.
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param processor the transformer used to populate the new exchange
+ */
+ public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) {
+ try {
+ Producer<E> producer = getProducer(endpoint);
+ E exchange = producer.createExchange();
+ boolean sync = sendExchange(endpoint, producer, processor, exchange, callback);
+ setProcessedSync(exchange, sync);
+ return exchange;
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
+ public static boolean isProcessedSync(Exchange exchange) {
+ Boolean rc = exchange.getProperty(ProducerCache.class.getName() + ".SYNC", Boolean.class);
+ return rc == null ? false : rc;
+ }
+
+ public static void setProcessedSync(Exchange exchange, boolean b) {
+ exchange.setProperty(ProducerCache.class.getName() + ".SYNC", b ? Boolean.TRUE : Boolean.FALSE );
+ }
+
+ /**
+ * Sends an exchange to an endpoint using a supplied
* @{link Processor} to populate the exchange
*
* @param endpoint the endpoint to send the exchange to
@@ -115,6 +146,17 @@
}
producer.process(exchange);
return exchange;
+ }
+
+ protected boolean sendExchange(Endpoint<E> endpoint, Producer<E> producer, Processor processor, E exchange, AsyncCallback callback) throws Exception {
+ // lets populate using the processor callback
+ processor.process(exchange);
+
+ // now lets dispatch
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(">>>> " + endpoint + " " + exchange);
+ }
+ return AsyncProcessorTypeConverter.convert(producer).process(exchange, callback);
}
protected void doStop() throws Exception {
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=576920&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Tue Sep 18 06:52:07 2007
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class ThreadTest extends ContextTestSupport {
+
+ protected MockEndpoint resultEndpoint;
+ private CountDownLatch continueProcessing = new CountDownLatch(1);
+
+ public void testSimpleAsyncThreadCase() throws Exception {
+
+ // Send the exchange using the async completion interface.
+ // This call returns before the exchange is completed.
+ template.send("direct:a", new Processor() {
+ public void process(Exchange exchange) {
+ // now lets fire in a message
+ Message in = exchange.getIn();
+ in.setBody(1);
+ }
+ }, new AsyncCallback() {
+ public void done(boolean doneSynchronously) {
+ log.info("Exchange completed.");
+ }
+ });
+
+ // Should not received anything since processing should not be complete.
+ resultEndpoint.expectedMessageCount(0);
+ resultEndpoint.assertIsSatisfied();
+
+ // Release the processing latch..
+ continueProcessing.countDown();
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ public void testSimpleSyncThreadCase() throws Exception {
+
+ // Release the processing latch in an async thread.
+ releaseProcessingLatchIn(1000);
+
+ // This call will block until the continueProcessing is released.
+ template.send("direct:a", new Processor() {
+ public void process(Exchange exchange) {
+ // now lets fire in a message
+ Message in = exchange.getIn();
+ in.setBody(1);
+ }
+ });
+
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ public void testQueuedUpExchangesCompleteOnShutdown() throws Exception {
+
+ int exchangeCount = 10;
+ final CountDownLatch completedExchanges = new CountDownLatch(exchangeCount);
+
+ final Exchange exchanges[] = new Exchange[exchangeCount];
+ for (int i = 0; i < exchangeCount; i++) {
+ final int index = i;
+ // Send the exchange using the async completion interface.
+ // This call returns before the exchange is completed.
+ exchanges[i] = template.send("direct:a", new Processor() {
+ public void process(Exchange exchange) {
+ // now lets fire in a message
+ Message in = exchange.getIn();
+ in.setBody(1);
+ }
+ }, new AsyncCallback() {
+ public void done(boolean doneSynchronously) {
+ System.out.println("Completed: "+index+", exception: "+exchanges[index].getException());
+ completedExchanges.countDown();
+ }
+ });
+ }
+
+ // Should not received anything since processing should not be complete.
+ resultEndpoint.expectedMessageCount(0);
+ resultEndpoint.assertIsSatisfied();
+
+ // Release it in a sec
+ releaseProcessingLatchIn(1000);
+ // Make sure we can shut down the context while there are
+ // concurrent requests outstanding.
+ stopCamelContext();
+
+ // All exchanges should get completed..
+ assertTrue(completedExchanges.await(5, TimeUnit.SECONDS));
+ }
+
+ protected void releaseProcessingLatchIn(final long delay) {
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(delay);
+ continueProcessing.countDown();
+ } catch (InterruptedException e) {
+ }
+ }
+ }.start();
+ }
+
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ resultEndpoint = getMockEndpoint("mock:result");
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ inheritErrorHandler(false);
+
+ // START SNIPPET: example
+ from("direct:a").thread(1).process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ continueProcessing.await();
+ }
+ }).to("mock:result");
+ // END SNIPPET: example
+ }
+ };
+ }
+
+}
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date